001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.nio.ByteBuffer;
022import java.util.Set;
023import java.util.concurrent.BlockingQueue;
024import java.util.concurrent.ConcurrentSkipListSet;
025import java.util.concurrent.LinkedBlockingQueue;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.concurrent.atomic.AtomicInteger;
028import java.util.concurrent.atomic.AtomicReference;
029import java.util.concurrent.locks.ReentrantLock;
030
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.ByteBufferExtendedCell;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.ExtendedCell;
035import org.apache.hadoop.hbase.KeyValueUtil;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
040import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
041/**
042 * A memstore-local allocation buffer.
043 * <p>
044 * The MemStoreLAB is basically a bump-the-pointer allocator that allocates
045 * big (2MB) byte[] chunks from and then doles it out to threads that request
046 * slices into the array.
047 * <p>
048 * The purpose of this class is to combat heap fragmentation in the
049 * regionserver. By ensuring that all Cells in a given memstore refer
050 * only to large chunks of contiguous memory, we ensure that large blocks
051 * get freed up when the memstore is flushed.
052 * <p>
053 * Without the MSLAB, the byte array allocated during insertion end up
054 * interleaved throughout the heap, and the old generation gets progressively
055 * more fragmented until a stop-the-world compacting collection occurs.
056 * <p>
057 * TODO: we should probably benchmark whether word-aligning the allocations
058 * would provide a performance improvement - probably would speed up the
059 * Bytes.toLong/Bytes.toInt calls in KeyValue, but some of those are cached
060 * anyway.
061 * The chunks created by this MemStoreLAB can get pooled at {@link ChunkCreator}.
062 * When the Chunk comes from pool, it can be either an on heap or an off heap backed chunk. The chunks,
063 * which this MemStoreLAB creates on its own (when no chunk available from pool), those will be
064 * always on heap backed.
065 */
066@InterfaceAudience.Private
067public class MemStoreLABImpl implements MemStoreLAB {
068
069  static final Logger LOG = LoggerFactory.getLogger(MemStoreLABImpl.class);
070
071  private AtomicReference<Chunk> currChunk = new AtomicReference<>();
072  // Lock to manage multiple handlers requesting for a chunk
073  private ReentrantLock lock = new ReentrantLock();
074
075  // A set of chunks contained by this memstore LAB
076  @VisibleForTesting
077  Set<Integer> chunks = new ConcurrentSkipListSet<Integer>();
078  private final int dataChunkSize;
079  private final int maxAlloc;
080  private final ChunkCreator chunkCreator;
081  private final CompactingMemStore.IndexType idxType; // what index is used for corresponding segment
082
083  // This flag is for closing this instance, its set when clearing snapshot of
084  // memstore
085  private volatile boolean closed = false;
086  // This flag is for reclaiming chunks. Its set when putting chunks back to
087  // pool
088  private AtomicBoolean reclaimed = new AtomicBoolean(false);
089  // Current count of open scanners which reading data from this MemStoreLAB
090  private final AtomicInteger openScannerCount = new AtomicInteger();
091
092  // Used in testing
093  public MemStoreLABImpl() {
094    this(new Configuration());
095  }
096
097  public MemStoreLABImpl(Configuration conf) {
098    dataChunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT);
099    maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT);
100    this.chunkCreator = ChunkCreator.getInstance();
101    // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
102    Preconditions.checkArgument(maxAlloc <= dataChunkSize,
103        MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY);
104
105    // if user requested to work with MSLABs (whether on- or off-heap), then the
106    // immutable segments are going to use CellChunkMap as their index
107    idxType = CompactingMemStore.IndexType.CHUNK_MAP;
108  }
109
110  @Override
111  public Cell copyCellInto(Cell cell) {
112    // See head of copyBBECellInto for how it differs from copyCellInto
113    return (cell instanceof ByteBufferExtendedCell)?
114        copyBBECellInto((ByteBufferExtendedCell)cell, maxAlloc):
115        copyCellInto(cell, maxAlloc);
116  }
117
118  /**
119   * When a cell's size is too big (bigger than maxAlloc),
120   * copyCellInto does not allocate it on MSLAB.
121   * Since the process of flattening to CellChunkMap assumes that
122   * all cells are allocated on MSLAB, during this process,
123   * the big cells are copied into MSLAB using this method.
124   */
125  @Override
126  public Cell forceCopyOfBigCellInto(Cell cell) {
127    int size = Segment.getCellLength(cell);
128    size += ChunkCreator.SIZEOF_CHUNK_HEADER;
129    Preconditions.checkArgument(size >= 0, "negative size");
130    if (size <= dataChunkSize) {
131      // Using copyCellInto for cells which are bigger than the original maxAlloc
132      return copyCellInto(cell, dataChunkSize);
133    } else {
134      Chunk c = getNewExternalChunk(size);
135      int allocOffset = c.alloc(size);
136      return copyToChunkCell(cell, c.getData(), allocOffset, size);
137    }
138  }
139
140  /**
141   * Mostly a duplicate of {@link #copyCellInto(Cell, int)}} done for perf sake. It presumes
142   * ByteBufferExtendedCell instead of Cell so we deal with a specific type rather than the
143   * super generic Cell. Removes instanceof checks. Shrinkage is enough to make this inline where
144   * before it was too big. Uses less CPU. See HBASE-20875 for evidence.
145   * @see #copyCellInto(Cell, int)
146   */
147  private Cell copyBBECellInto(ByteBufferExtendedCell cell, int maxAlloc) {
148    int size = cell.getSerializedSize();
149    Preconditions.checkArgument(size >= 0, "negative size");
150    // Callers should satisfy large allocations from JVM heap so limit fragmentation.
151    if (size > maxAlloc) {
152      return null;
153    }
154    Chunk c = null;
155    int allocOffset = 0;
156    while (true) {
157      // Try to get the chunk
158      c = getOrMakeChunk();
159      // We may get null because the some other thread succeeded in getting the lock
160      // and so the current thread has to try again to make its chunk or grab the chunk
161      // that the other thread created
162      // Try to allocate from this chunk
163      if (c != null) {
164        allocOffset = c.alloc(size);
165        if (allocOffset != -1) {
166          // We succeeded - this is the common case - small alloc
167          // from a big buffer
168          break;
169        }
170        // not enough space!
171        // try to retire this chunk
172        tryRetireChunk(c);
173      }
174    }
175    return copyBBECToChunkCell(cell, c.getData(), allocOffset, size);
176  }
177
178  /**
179   * @see #copyBBECellInto(ByteBufferExtendedCell, int)
180   */
181  private Cell copyCellInto(Cell cell, int maxAlloc) {
182    int size = Segment.getCellLength(cell);
183    Preconditions.checkArgument(size >= 0, "negative size");
184    // Callers should satisfy large allocations directly from JVM since they
185    // don't cause fragmentation as badly.
186    if (size > maxAlloc) {
187      return null;
188    }
189    Chunk c = null;
190    int allocOffset = 0;
191    while (true) {
192      // Try to get the chunk
193      c = getOrMakeChunk();
194      // we may get null because the some other thread succeeded in getting the lock
195      // and so the current thread has to try again to make its chunk or grab the chunk
196      // that the other thread created
197      // Try to allocate from this chunk
198      if (c != null) {
199        allocOffset = c.alloc(size);
200        if (allocOffset != -1) {
201          // We succeeded - this is the common case - small alloc
202          // from a big buffer
203          break;
204        }
205        // not enough space!
206        // try to retire this chunk
207        tryRetireChunk(c);
208      }
209    }
210    return copyToChunkCell(cell, c.getData(), allocOffset, size);
211  }
212
213  /**
214   * Clone the passed cell by copying its data into the passed buf and create a cell with a chunkid
215   * out of it
216   * @see #copyBBECToChunkCell(ByteBufferExtendedCell, ByteBuffer, int, int)
217   */
218  private static Cell copyToChunkCell(Cell cell, ByteBuffer buf, int offset, int len) {
219    int tagsLen = cell.getTagsLength();
220    if (cell instanceof ExtendedCell) {
221      ((ExtendedCell) cell).write(buf, offset);
222    } else {
223      // Normally all Cell impls within Server will be of type ExtendedCell. Just considering the
224      // other case also. The data fragments within Cell is copied into buf as in KeyValue
225      // serialization format only.
226      KeyValueUtil.appendTo(cell, buf, offset, true);
227    }
228    return createChunkCell(buf, offset, len, tagsLen, cell.getSequenceId());
229  }
230
231  /**
232   * Clone the passed cell by copying its data into the passed buf and create a cell with a chunkid
233   * out of it
234   * @see #copyToChunkCell(Cell, ByteBuffer, int, int)
235   */
236  private static Cell copyBBECToChunkCell(ByteBufferExtendedCell cell, ByteBuffer buf, int offset,
237      int len) {
238    int tagsLen = cell.getTagsLength();
239    cell.write(buf, offset);
240    return createChunkCell(buf, offset, len, tagsLen, cell.getSequenceId());
241  }
242
243  private static Cell createChunkCell(ByteBuffer buf, int offset, int len, int tagsLen,
244      long sequenceId) {
245    // TODO : write the seqid here. For writing seqId we should create a new cell type so
246    // that seqId is not used as the state
247    if (tagsLen == 0) {
248      // When tagsLen is 0, make a NoTagsByteBufferKeyValue version. This is an optimized class
249      // which directly return tagsLen as 0. So we avoid parsing many length components in
250      // reading the tagLength stored in the backing buffer. The Memstore addition of every Cell
251      // call getTagsLength().
252      return new NoTagByteBufferChunkKeyValue(buf, offset, len, sequenceId);
253    } else {
254      return new ByteBufferChunkKeyValue(buf, offset, len, sequenceId);
255    }
256  }
257
258  /**
259   * Close this instance since it won't be used any more, try to put the chunks
260   * back to pool
261   */
262  @Override
263  public void close() {
264    this.closed = true;
265    // We could put back the chunks to pool for reusing only when there is no
266    // opening scanner which will read their data
267    int count  = openScannerCount.get();
268    if(count == 0) {
269      recycleChunks();
270    }
271  }
272
273  @VisibleForTesting
274  int getOpenScannerCount() {
275    return this.openScannerCount.get();
276  }
277
278  /**
279   * Called when opening a scanner on the data of this MemStoreLAB
280   */
281  @Override
282  public void incScannerCount() {
283    this.openScannerCount.incrementAndGet();
284  }
285
286  /**
287   * Called when closing a scanner on the data of this MemStoreLAB
288   */
289  @Override
290  public void decScannerCount() {
291    int count = this.openScannerCount.decrementAndGet();
292    if (this.closed && count == 0) {
293      recycleChunks();
294    }
295  }
296
297  private void recycleChunks() {
298    if (reclaimed.compareAndSet(false, true)) {
299      chunkCreator.putbackChunks(chunks);
300    }
301  }
302
303  /**
304   * Try to retire the current chunk if it is still
305   * <code>c</code>. Postcondition is that curChunk.get()
306   * != c
307   * @param c the chunk to retire
308   */
309  private void tryRetireChunk(Chunk c) {
310    currChunk.compareAndSet(c, null);
311    // If the CAS succeeds, that means that we won the race
312    // to retire the chunk. We could use this opportunity to
313    // update metrics on external fragmentation.
314    //
315    // If the CAS fails, that means that someone else already
316    // retired the chunk for us.
317  }
318
319  /**
320   * Get the current chunk, or, if there is no current chunk,
321   * allocate a new one from the JVM.
322   */
323  private Chunk getOrMakeChunk() {
324    // Try to get the chunk
325    Chunk c = currChunk.get();
326    if (c != null) {
327      return c;
328    }
329    // No current chunk, so we want to allocate one. We race
330    // against other allocators to CAS in an uninitialized chunk
331    // (which is cheap to allocate)
332    if (lock.tryLock()) {
333      try {
334        // once again check inside the lock
335        c = currChunk.get();
336        if (c != null) {
337          return c;
338        }
339        c = this.chunkCreator.getChunk(idxType);
340        if (c != null) {
341          // set the curChunk. No need of CAS as only one thread will be here
342          currChunk.set(c);
343          chunks.add(c.getId());
344          return c;
345        }
346      } finally {
347        lock.unlock();
348      }
349    }
350    return null;
351  }
352
353  /* Returning a new pool chunk, without replacing current chunk,
354  ** meaning MSLABImpl does not make the returned chunk as CurChunk.
355  ** The space on this chunk will be allocated externally.
356  ** The interface is only for external callers.
357  */
358  @Override
359  public Chunk getNewExternalChunk(ChunkCreator.ChunkType chunkType) {
360    switch (chunkType) {
361      case INDEX_CHUNK:
362      case DATA_CHUNK:
363        Chunk c = this.chunkCreator.getChunk(chunkType);
364        chunks.add(c.getId());
365        return c;
366      case JUMBO_CHUNK: // a jumbo chunk doesn't have a fixed size
367      default:
368        return null;
369    }
370  }
371
372  /* Returning a new chunk, without replacing current chunk,
373  ** meaning MSLABImpl does not make the returned chunk as CurChunk.
374  ** The space on this chunk will be allocated externally.
375  ** The interface is only for external callers.
376  ** Chunks from pools are not allocated from here, since they have fixed sizes
377  */
378  @Override
379  public Chunk getNewExternalChunk(int size) {
380    int allocSize = size + ChunkCreator.getInstance().SIZEOF_CHUNK_HEADER;
381    if (allocSize <= ChunkCreator.getInstance().getChunkSize()) {
382      return getNewExternalChunk(ChunkCreator.ChunkType.DATA_CHUNK);
383    } else {
384      Chunk c = this.chunkCreator.getJumboChunk(size);
385      chunks.add(c.getId());
386      return c;
387    }
388  }
389
390  @Override
391  public boolean isOnHeap() {
392    return !isOffHeap();
393  }
394
395  @Override
396  public boolean isOffHeap() {
397    return this.chunkCreator.isOffheap();
398  }
399
400  @VisibleForTesting
401  Chunk getCurrentChunk() {
402    return currChunk.get();
403  }
404
405  @VisibleForTesting
406  BlockingQueue<Chunk> getPooledChunks() {
407    BlockingQueue<Chunk> pooledChunks = new LinkedBlockingQueue<>();
408    for (Integer id : this.chunks) {
409      Chunk chunk = chunkCreator.getChunk(id);
410      if (chunk != null && chunk.isFromPool()) {
411        pooledChunks.add(chunk);
412      }
413    }
414    return pooledChunks;
415  }
416
417  @VisibleForTesting Integer getNumOfChunksReturnedToPool() {
418    int i = 0;
419    for (Integer id : this.chunks) {
420      if (chunkCreator.isChunkInPool(id)) {
421        i++;
422      }
423    }
424    return i;
425  }
426}