001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.nio.ByteBuffer;
022import java.util.Set;
023import java.util.concurrent.BlockingQueue;
024import java.util.concurrent.ConcurrentSkipListSet;
025import java.util.concurrent.LinkedBlockingQueue;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.concurrent.atomic.AtomicInteger;
028import java.util.concurrent.atomic.AtomicReference;
029import java.util.concurrent.locks.ReentrantLock;
030
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.ByteBufferExtendedCell;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.ExtendedCell;
035import org.apache.hadoop.hbase.KeyValueUtil;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
040import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
041/**
042 * A memstore-local allocation buffer.
043 * <p>
044 * The MemStoreLAB is basically a bump-the-pointer allocator that allocates
045 * big (2MB) byte[] chunks from and then doles it out to threads that request
046 * slices into the array.
047 * <p>
048 * The purpose of this class is to combat heap fragmentation in the
049 * regionserver. By ensuring that all Cells in a given memstore refer
050 * only to large chunks of contiguous memory, we ensure that large blocks
051 * get freed up when the memstore is flushed.
052 * <p>
053 * Without the MSLAB, the byte array allocated during insertion end up
054 * interleaved throughout the heap, and the old generation gets progressively
055 * more fragmented until a stop-the-world compacting collection occurs.
056 * <p>
057 * TODO: we should probably benchmark whether word-aligning the allocations
058 * would provide a performance improvement - probably would speed up the
059 * Bytes.toLong/Bytes.toInt calls in KeyValue, but some of those are cached
060 * anyway.
061 * The chunks created by this MemStoreLAB can get pooled at {@link ChunkCreator}.
062 * When the Chunk comes from pool, it can be either an on heap or an off heap backed chunk. The chunks,
063 * which this MemStoreLAB creates on its own (when no chunk available from pool), those will be
064 * always on heap backed.
065 */
066@InterfaceAudience.Private
067public class MemStoreLABImpl implements MemStoreLAB {
068
069  static final Logger LOG = LoggerFactory.getLogger(MemStoreLABImpl.class);
070
071  private AtomicReference<Chunk> currChunk = new AtomicReference<>();
072  // Lock to manage multiple handlers requesting for a chunk
073  private ReentrantLock lock = new ReentrantLock();
074
075  // A set of chunks contained by this memstore LAB
076  @VisibleForTesting
077  Set<Integer> chunks = new ConcurrentSkipListSet<Integer>();
078  private final int dataChunkSize;
079  private final int maxAlloc;
080  private final ChunkCreator chunkCreator;
081  private final CompactingMemStore.IndexType idxType; // what index is used for corresponding segment
082
083  // This flag is for closing this instance, its set when clearing snapshot of
084  // memstore
085  private volatile boolean closed = false;
086  // This flag is for reclaiming chunks. Its set when putting chunks back to
087  // pool
088  private AtomicBoolean reclaimed = new AtomicBoolean(false);
089  // Current count of open scanners which reading data from this MemStoreLAB
090  private final AtomicInteger openScannerCount = new AtomicInteger();
091
092  // Used in testing
093  public MemStoreLABImpl() {
094    this(new Configuration());
095  }
096
097  public MemStoreLABImpl(Configuration conf) {
098    dataChunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT);
099    maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT);
100    this.chunkCreator = ChunkCreator.getInstance();
101    // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
102    Preconditions.checkArgument(maxAlloc <= dataChunkSize,
103        MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY);
104
105    // if user requested to work with MSLABs (whether on- or off-heap), then the
106    // immutable segments are going to use CellChunkMap as their index
107    idxType = CompactingMemStore.IndexType.CHUNK_MAP;
108  }
109
110  @Override
111  public Cell copyCellInto(Cell cell) {
112    // See head of copyBBECellInto for how it differs from copyCellInto
113    return (cell instanceof ByteBufferExtendedCell)?
114        copyBBECellInto((ByteBufferExtendedCell)cell, maxAlloc):
115        copyCellInto(cell, maxAlloc);
116  }
117
118  /**
119   * When a cell's size is too big (bigger than maxAlloc),
120   * copyCellInto does not allocate it on MSLAB.
121   * Since the process of flattening to CellChunkMap assumes that
122   * all cells are allocated on MSLAB, during this process,
123   * the big cells are copied into MSLAB using this method.
124   */
125  @Override
126  public Cell forceCopyOfBigCellInto(Cell cell) {
127    int size = cell instanceof ExtendedCell? ((ExtendedCell)cell).getSerializedSize():
128        KeyValueUtil.length(cell);
129    size += ChunkCreator.SIZEOF_CHUNK_HEADER;
130    Preconditions.checkArgument(size >= 0, "negative size");
131    if (size <= dataChunkSize) {
132      // Using copyCellInto for cells which are bigger than the original maxAlloc
133      return copyCellInto(cell, dataChunkSize);
134    } else {
135      Chunk c = getNewExternalChunk(size);
136      int allocOffset = c.alloc(size);
137      return copyToChunkCell(cell, c.getData(), allocOffset, size);
138    }
139  }
140
141  /**
142   * Mostly a duplicate of {@link #copyCellInto(Cell, int)}} done for perf sake. It presumes
143   * ByteBufferExtendedCell instead of Cell so we deal with a specific type rather than the
144   * super generic Cell. Removes instanceof checks. Shrinkage is enough to make this inline where
145   * before it was too big. Uses less CPU. See HBASE-20875 for evidence.
146   * @see #copyCellInto(Cell, int)
147   */
148  private Cell copyBBECellInto(ByteBufferExtendedCell cell, int maxAlloc) {
149    int size = cell.getSerializedSize();
150    Preconditions.checkArgument(size >= 0, "negative size");
151    // Callers should satisfy large allocations from JVM heap so limit fragmentation.
152    if (size > maxAlloc) {
153      return null;
154    }
155    Chunk c = null;
156    int allocOffset = 0;
157    while (true) {
158      // Try to get the chunk
159      c = getOrMakeChunk();
160      // We may get null because the some other thread succeeded in getting the lock
161      // and so the current thread has to try again to make its chunk or grab the chunk
162      // that the other thread created
163      // Try to allocate from this chunk
164      if (c != null) {
165        allocOffset = c.alloc(size);
166        if (allocOffset != -1) {
167          // We succeeded - this is the common case - small alloc
168          // from a big buffer
169          break;
170        }
171        // not enough space!
172        // try to retire this chunk
173        tryRetireChunk(c);
174      }
175    }
176    return copyBBECToChunkCell(cell, c.getData(), allocOffset, size);
177  }
178
179  /**
180   * @see #copyBBECellInto(ByteBufferExtendedCell, int)
181   */
182  private Cell copyCellInto(Cell cell, int maxAlloc) {
183    int size = cell instanceof ExtendedCell? ((ExtendedCell)cell).getSerializedSize():
184        KeyValueUtil.length(cell);
185    Preconditions.checkArgument(size >= 0, "negative size");
186    // Callers should satisfy large allocations directly from JVM since they
187    // don't cause fragmentation as badly.
188    if (size > maxAlloc) {
189      return null;
190    }
191    Chunk c = null;
192    int allocOffset = 0;
193    while (true) {
194      // Try to get the chunk
195      c = getOrMakeChunk();
196      // we may get null because the some other thread succeeded in getting the lock
197      // and so the current thread has to try again to make its chunk or grab the chunk
198      // that the other thread created
199      // Try to allocate from this chunk
200      if (c != null) {
201        allocOffset = c.alloc(size);
202        if (allocOffset != -1) {
203          // We succeeded - this is the common case - small alloc
204          // from a big buffer
205          break;
206        }
207        // not enough space!
208        // try to retire this chunk
209        tryRetireChunk(c);
210      }
211    }
212    return copyToChunkCell(cell, c.getData(), allocOffset, size);
213  }
214
215  /**
216   * Clone the passed cell by copying its data into the passed buf and create a cell with a chunkid
217   * out of it
218   * @see #copyBBECToChunkCell(ByteBufferExtendedCell, ByteBuffer, int, int)
219   */
220  private static Cell copyToChunkCell(Cell cell, ByteBuffer buf, int offset, int len) {
221    int tagsLen = cell.getTagsLength();
222    if (cell instanceof ExtendedCell) {
223      ((ExtendedCell) cell).write(buf, offset);
224    } else {
225      // Normally all Cell impls within Server will be of type ExtendedCell. Just considering the
226      // other case also. The data fragments within Cell is copied into buf as in KeyValue
227      // serialization format only.
228      KeyValueUtil.appendTo(cell, buf, offset, true);
229    }
230    return createChunkCell(buf, offset, len, tagsLen, cell.getSequenceId());
231  }
232
233  /**
234   * Clone the passed cell by copying its data into the passed buf and create a cell with a chunkid
235   * out of it
236   * @see #copyToChunkCell(Cell, ByteBuffer, int, int)
237   */
238  private static Cell copyBBECToChunkCell(ByteBufferExtendedCell cell, ByteBuffer buf, int offset,
239      int len) {
240    int tagsLen = cell.getTagsLength();
241    cell.write(buf, offset);
242    return createChunkCell(buf, offset, len, tagsLen, cell.getSequenceId());
243  }
244
245  private static Cell createChunkCell(ByteBuffer buf, int offset, int len, int tagsLen,
246      long sequenceId) {
247    // TODO : write the seqid here. For writing seqId we should create a new cell type so
248    // that seqId is not used as the state
249    if (tagsLen == 0) {
250      // When tagsLen is 0, make a NoTagsByteBufferKeyValue version. This is an optimized class
251      // which directly return tagsLen as 0. So we avoid parsing many length components in
252      // reading the tagLength stored in the backing buffer. The Memstore addition of every Cell
253      // call getTagsLength().
254      return new NoTagByteBufferChunkKeyValue(buf, offset, len, sequenceId);
255    } else {
256      return new ByteBufferChunkKeyValue(buf, offset, len, sequenceId);
257    }
258  }
259
260  /**
261   * Close this instance since it won't be used any more, try to put the chunks
262   * back to pool
263   */
264  @Override
265  public void close() {
266    this.closed = true;
267    // We could put back the chunks to pool for reusing only when there is no
268    // opening scanner which will read their data
269    int count  = openScannerCount.get();
270    if(count == 0) {
271      recycleChunks();
272    }
273  }
274
275  /**
276   * Called when opening a scanner on the data of this MemStoreLAB
277   */
278  @Override
279  public void incScannerCount() {
280    this.openScannerCount.incrementAndGet();
281  }
282
283  /**
284   * Called when closing a scanner on the data of this MemStoreLAB
285   */
286  @Override
287  public void decScannerCount() {
288    int count = this.openScannerCount.decrementAndGet();
289    if (this.closed && count == 0) {
290      recycleChunks();
291    }
292  }
293
294  private void recycleChunks() {
295    if (reclaimed.compareAndSet(false, true)) {
296      chunkCreator.putbackChunks(chunks);
297    }
298  }
299
300  /**
301   * Try to retire the current chunk if it is still
302   * <code>c</code>. Postcondition is that curChunk.get()
303   * != c
304   * @param c the chunk to retire
305   */
306  private void tryRetireChunk(Chunk c) {
307    currChunk.compareAndSet(c, null);
308    // If the CAS succeeds, that means that we won the race
309    // to retire the chunk. We could use this opportunity to
310    // update metrics on external fragmentation.
311    //
312    // If the CAS fails, that means that someone else already
313    // retired the chunk for us.
314  }
315
316  /**
317   * Get the current chunk, or, if there is no current chunk,
318   * allocate a new one from the JVM.
319   */
320  private Chunk getOrMakeChunk() {
321    // Try to get the chunk
322    Chunk c = currChunk.get();
323    if (c != null) {
324      return c;
325    }
326    // No current chunk, so we want to allocate one. We race
327    // against other allocators to CAS in an uninitialized chunk
328    // (which is cheap to allocate)
329    if (lock.tryLock()) {
330      try {
331        // once again check inside the lock
332        c = currChunk.get();
333        if (c != null) {
334          return c;
335        }
336        c = this.chunkCreator.getChunk(idxType);
337        if (c != null) {
338          // set the curChunk. No need of CAS as only one thread will be here
339          currChunk.set(c);
340          chunks.add(c.getId());
341          return c;
342        }
343      } finally {
344        lock.unlock();
345      }
346    }
347    return null;
348  }
349
350  /* Returning a new pool chunk, without replacing current chunk,
351  ** meaning MSLABImpl does not make the returned chunk as CurChunk.
352  ** The space on this chunk will be allocated externally.
353  ** The interface is only for external callers.
354  */
355  @Override
356  public Chunk getNewExternalChunk(ChunkCreator.ChunkType chunkType) {
357    switch (chunkType) {
358      case INDEX_CHUNK:
359      case DATA_CHUNK:
360        Chunk c = this.chunkCreator.getChunk(chunkType);
361        chunks.add(c.getId());
362        return c;
363      case JUMBO_CHUNK: // a jumbo chunk doesn't have a fixed size
364      default:
365        return null;
366    }
367  }
368
369  /* Returning a new chunk, without replacing current chunk,
370  ** meaning MSLABImpl does not make the returned chunk as CurChunk.
371  ** The space on this chunk will be allocated externally.
372  ** The interface is only for external callers.
373  ** Chunks from pools are not allocated from here, since they have fixed sizes
374  */
375  @Override
376  public Chunk getNewExternalChunk(int size) {
377    int allocSize = size + ChunkCreator.getInstance().SIZEOF_CHUNK_HEADER;
378    if (allocSize <= ChunkCreator.getInstance().getChunkSize()) {
379      return getNewExternalChunk(ChunkCreator.ChunkType.DATA_CHUNK);
380    } else {
381      Chunk c = this.chunkCreator.getJumboChunk(size);
382      chunks.add(c.getId());
383      return c;
384    }
385  }
386
387  @Override
388  public boolean isOnHeap() {
389    return !isOffHeap();
390  }
391
392  @Override
393  public boolean isOffHeap() {
394    return this.chunkCreator.isOffheap();
395  }
396
397  @VisibleForTesting
398  Chunk getCurrentChunk() {
399    return currChunk.get();
400  }
401
402  @VisibleForTesting
403  BlockingQueue<Chunk> getPooledChunks() {
404    BlockingQueue<Chunk> pooledChunks = new LinkedBlockingQueue<>();
405    for (Integer id : this.chunks) {
406      Chunk chunk = chunkCreator.getChunk(id);
407      if (chunk != null && chunk.isFromPool()) {
408        pooledChunks.add(chunk);
409      }
410    }
411    return pooledChunks;
412  }
413
414  @VisibleForTesting Integer getNumOfChunksReturnedToPool() {
415    int i = 0;
416    for (Integer id : this.chunks) {
417      if (chunkCreator.isChunkInPool(id)) {
418        i++;
419      }
420    }
421    return i;
422  }
423}