001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.nio.ByteBuffer;
022import java.util.Set;
023import java.util.concurrent.BlockingQueue;
024import java.util.concurrent.ConcurrentSkipListSet;
025import java.util.concurrent.LinkedBlockingQueue;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.concurrent.atomic.AtomicInteger;
028import java.util.concurrent.atomic.AtomicReference;
029import java.util.concurrent.locks.ReentrantLock;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.ByteBufferExtendedCell;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.ExtendedCell;
034import org.apache.hadoop.hbase.KeyValueUtil;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
040
041/**
042 * A memstore-local allocation buffer.
043 * <p>
044 * The MemStoreLAB is basically a bump-the-pointer allocator that allocates
045 * big (2MB) byte[] chunks from and then doles it out to threads that request
046 * slices into the array.
047 * <p>
048 * The purpose of this class is to combat heap fragmentation in the
049 * regionserver. By ensuring that all Cells in a given memstore refer
050 * only to large chunks of contiguous memory, we ensure that large blocks
051 * get freed up when the memstore is flushed.
052 * <p>
053 * Without the MSLAB, the byte array allocated during insertion end up
054 * interleaved throughout the heap, and the old generation gets progressively
055 * more fragmented until a stop-the-world compacting collection occurs.
056 * <p>
057 * TODO: we should probably benchmark whether word-aligning the allocations
058 * would provide a performance improvement - probably would speed up the
059 * Bytes.toLong/Bytes.toInt calls in KeyValue, but some of those are cached
060 * anyway.
061 * The chunks created by this MemStoreLAB can get pooled at {@link ChunkCreator}.
062 * When the Chunk comes from pool, it can be either an on heap or an off heap backed chunk. The chunks,
063 * which this MemStoreLAB creates on its own (when no chunk available from pool), those will be
064 * always on heap backed.
065 */
066@InterfaceAudience.Private
067public class MemStoreLABImpl implements MemStoreLAB {
068
069  static final Logger LOG = LoggerFactory.getLogger(MemStoreLABImpl.class);
070
071  private AtomicReference<Chunk> currChunk = new AtomicReference<>();
072  // Lock to manage multiple handlers requesting for a chunk
073  private ReentrantLock lock = new ReentrantLock();
074
075  // A set of chunks contained by this memstore LAB
076  Set<Integer> chunks = new ConcurrentSkipListSet<Integer>();
077  private final int dataChunkSize;
078  private final int maxAlloc;
079  private final ChunkCreator chunkCreator;
080  private final CompactingMemStore.IndexType idxType; // what index is used for corresponding segment
081
082  // This flag is for closing this instance, its set when clearing snapshot of
083  // memstore
084  private volatile boolean closed = false;
085  // This flag is for reclaiming chunks. Its set when putting chunks back to
086  // pool
087  private AtomicBoolean reclaimed = new AtomicBoolean(false);
088  // Current count of open scanners which reading data from this MemStoreLAB
089  private final AtomicInteger openScannerCount = new AtomicInteger();
090
091  // Used in testing
092  public MemStoreLABImpl() {
093    this(new Configuration());
094  }
095
096  public MemStoreLABImpl(Configuration conf) {
097    dataChunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT);
098    maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT);
099    this.chunkCreator = ChunkCreator.getInstance();
100    // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
101    Preconditions.checkArgument(maxAlloc <= dataChunkSize,
102        MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY);
103
104    // if user requested to work with MSLABs (whether on- or off-heap), then the
105    // immutable segments are going to use CellChunkMap as their index
106    idxType = CompactingMemStore.IndexType.CHUNK_MAP;
107  }
108
109  @Override
110  public Cell copyCellInto(Cell cell) {
111    // See head of copyBBECellInto for how it differs from copyCellInto
112    return (cell instanceof ByteBufferExtendedCell)?
113        copyBBECellInto((ByteBufferExtendedCell)cell, maxAlloc):
114        copyCellInto(cell, maxAlloc);
115  }
116
117  /**
118   * When a cell's size is too big (bigger than maxAlloc),
119   * copyCellInto does not allocate it on MSLAB.
120   * Since the process of flattening to CellChunkMap assumes that
121   * all cells are allocated on MSLAB, during this process,
122   * the big cells are copied into MSLAB using this method.
123   */
124  @Override
125  public Cell forceCopyOfBigCellInto(Cell cell) {
126    int size = Segment.getCellLength(cell);
127    size += ChunkCreator.SIZEOF_CHUNK_HEADER;
128    Preconditions.checkArgument(size >= 0, "negative size");
129    if (size <= dataChunkSize) {
130      // Using copyCellInto for cells which are bigger than the original maxAlloc
131      return copyCellInto(cell, dataChunkSize);
132    } else {
133      Chunk c = getNewExternalChunk(size);
134      int allocOffset = c.alloc(size);
135      return copyToChunkCell(cell, c.getData(), allocOffset, size);
136    }
137  }
138
139  /**
140   * Mostly a duplicate of {@link #copyCellInto(Cell, int)}} done for perf sake. It presumes
141   * ByteBufferExtendedCell instead of Cell so we deal with a specific type rather than the
142   * super generic Cell. Removes instanceof checks. Shrinkage is enough to make this inline where
143   * before it was too big. Uses less CPU. See HBASE-20875 for evidence.
144   * @see #copyCellInto(Cell, int)
145   */
146  private Cell copyBBECellInto(ByteBufferExtendedCell cell, int maxAlloc) {
147    int size = cell.getSerializedSize();
148    Preconditions.checkArgument(size >= 0, "negative size");
149    // Callers should satisfy large allocations from JVM heap so limit fragmentation.
150    if (size > maxAlloc) {
151      return null;
152    }
153    Chunk c = null;
154    int allocOffset = 0;
155    while (true) {
156      // Try to get the chunk
157      c = getOrMakeChunk();
158      // We may get null because the some other thread succeeded in getting the lock
159      // and so the current thread has to try again to make its chunk or grab the chunk
160      // that the other thread created
161      // Try to allocate from this chunk
162      if (c != null) {
163        allocOffset = c.alloc(size);
164        if (allocOffset != -1) {
165          // We succeeded - this is the common case - small alloc
166          // from a big buffer
167          break;
168        }
169        // not enough space!
170        // try to retire this chunk
171        tryRetireChunk(c);
172      }
173    }
174    return copyBBECToChunkCell(cell, c.getData(), allocOffset, size);
175  }
176
177  /**
178   * @see #copyBBECellInto(ByteBufferExtendedCell, int)
179   */
180  private Cell copyCellInto(Cell cell, int maxAlloc) {
181    int size = Segment.getCellLength(cell);
182    Preconditions.checkArgument(size >= 0, "negative size");
183    // Callers should satisfy large allocations directly from JVM since they
184    // don't cause fragmentation as badly.
185    if (size > maxAlloc) {
186      return null;
187    }
188    Chunk c = null;
189    int allocOffset = 0;
190    while (true) {
191      // Try to get the chunk
192      c = getOrMakeChunk();
193      // we may get null because the some other thread succeeded in getting the lock
194      // and so the current thread has to try again to make its chunk or grab the chunk
195      // that the other thread created
196      // Try to allocate from this chunk
197      if (c != null) {
198        allocOffset = c.alloc(size);
199        if (allocOffset != -1) {
200          // We succeeded - this is the common case - small alloc
201          // from a big buffer
202          break;
203        }
204        // not enough space!
205        // try to retire this chunk
206        tryRetireChunk(c);
207      }
208    }
209    return copyToChunkCell(cell, c.getData(), allocOffset, size);
210  }
211
212  /**
213   * Clone the passed cell by copying its data into the passed buf and create a cell with a chunkid
214   * out of it
215   * @see #copyBBECToChunkCell(ByteBufferExtendedCell, ByteBuffer, int, int)
216   */
217  private static Cell copyToChunkCell(Cell cell, ByteBuffer buf, int offset, int len) {
218    int tagsLen = cell.getTagsLength();
219    if (cell instanceof ExtendedCell) {
220      ((ExtendedCell) cell).write(buf, offset);
221    } else {
222      // Normally all Cell impls within Server will be of type ExtendedCell. Just considering the
223      // other case also. The data fragments within Cell is copied into buf as in KeyValue
224      // serialization format only.
225      KeyValueUtil.appendTo(cell, buf, offset, true);
226    }
227    return createChunkCell(buf, offset, len, tagsLen, cell.getSequenceId());
228  }
229
230  /**
231   * Clone the passed cell by copying its data into the passed buf and create a cell with a chunkid
232   * out of it
233   * @see #copyToChunkCell(Cell, ByteBuffer, int, int)
234   */
235  private static Cell copyBBECToChunkCell(ByteBufferExtendedCell cell, ByteBuffer buf, int offset,
236      int len) {
237    int tagsLen = cell.getTagsLength();
238    cell.write(buf, offset);
239    return createChunkCell(buf, offset, len, tagsLen, cell.getSequenceId());
240  }
241
242  private static Cell createChunkCell(ByteBuffer buf, int offset, int len, int tagsLen,
243      long sequenceId) {
244    // TODO : write the seqid here. For writing seqId we should create a new cell type so
245    // that seqId is not used as the state
246    if (tagsLen == 0) {
247      // When tagsLen is 0, make a NoTagsByteBufferKeyValue version. This is an optimized class
248      // which directly return tagsLen as 0. So we avoid parsing many length components in
249      // reading the tagLength stored in the backing buffer. The Memstore addition of every Cell
250      // call getTagsLength().
251      return new NoTagByteBufferChunkKeyValue(buf, offset, len, sequenceId);
252    } else {
253      return new ByteBufferChunkKeyValue(buf, offset, len, sequenceId);
254    }
255  }
256
257  /**
258   * Close this instance since it won't be used any more, try to put the chunks
259   * back to pool
260   */
261  @Override
262  public void close() {
263    this.closed = true;
264    // We could put back the chunks to pool for reusing only when there is no
265    // opening scanner which will read their data
266    int count  = openScannerCount.get();
267    if(count == 0) {
268      recycleChunks();
269    }
270  }
271
272  int getOpenScannerCount() {
273    return this.openScannerCount.get();
274  }
275
276  /**
277   * Called when opening a scanner on the data of this MemStoreLAB
278   */
279  @Override
280  public void incScannerCount() {
281    this.openScannerCount.incrementAndGet();
282  }
283
284  /**
285   * Called when closing a scanner on the data of this MemStoreLAB
286   */
287  @Override
288  public void decScannerCount() {
289    int count = this.openScannerCount.decrementAndGet();
290    if (this.closed && count == 0) {
291      recycleChunks();
292    }
293  }
294
295  private void recycleChunks() {
296    if (reclaimed.compareAndSet(false, true)) {
297      chunkCreator.putbackChunks(chunks);
298    }
299  }
300
301  /**
302   * Try to retire the current chunk if it is still
303   * <code>c</code>. Postcondition is that curChunk.get()
304   * != c
305   * @param c the chunk to retire
306   */
307  private void tryRetireChunk(Chunk c) {
308    currChunk.compareAndSet(c, null);
309    // If the CAS succeeds, that means that we won the race
310    // to retire the chunk. We could use this opportunity to
311    // update metrics on external fragmentation.
312    //
313    // If the CAS fails, that means that someone else already
314    // retired the chunk for us.
315  }
316
317  /**
318   * Get the current chunk, or, if there is no current chunk,
319   * allocate a new one from the JVM.
320   */
321  private Chunk getOrMakeChunk() {
322    // Try to get the chunk
323    Chunk c = currChunk.get();
324    if (c != null) {
325      return c;
326    }
327    // No current chunk, so we want to allocate one. We race
328    // against other allocators to CAS in an uninitialized chunk
329    // (which is cheap to allocate)
330    if (lock.tryLock()) {
331      try {
332        // once again check inside the lock
333        c = currChunk.get();
334        if (c != null) {
335          return c;
336        }
337        c = this.chunkCreator.getChunk(idxType);
338        if (c != null) {
339          // set the curChunk. No need of CAS as only one thread will be here
340          currChunk.set(c);
341          chunks.add(c.getId());
342          return c;
343        }
344      } finally {
345        lock.unlock();
346      }
347    }
348    return null;
349  }
350
351  /* Returning a new pool chunk, without replacing current chunk,
352  ** meaning MSLABImpl does not make the returned chunk as CurChunk.
353  ** The space on this chunk will be allocated externally.
354  ** The interface is only for external callers.
355  */
356  @Override
357  public Chunk getNewExternalChunk(ChunkCreator.ChunkType chunkType) {
358    switch (chunkType) {
359      case INDEX_CHUNK:
360      case DATA_CHUNK:
361        Chunk c = this.chunkCreator.getChunk(chunkType);
362        chunks.add(c.getId());
363        return c;
364      case JUMBO_CHUNK: // a jumbo chunk doesn't have a fixed size
365      default:
366        return null;
367    }
368  }
369
370  /* Returning a new chunk, without replacing current chunk,
371  ** meaning MSLABImpl does not make the returned chunk as CurChunk.
372  ** The space on this chunk will be allocated externally.
373  ** The interface is only for external callers.
374  ** Chunks from pools are not allocated from here, since they have fixed sizes
375  */
376  @Override
377  public Chunk getNewExternalChunk(int size) {
378    int allocSize = size + ChunkCreator.SIZEOF_CHUNK_HEADER;
379    if (allocSize <= ChunkCreator.getInstance().getChunkSize()) {
380      return getNewExternalChunk(ChunkCreator.ChunkType.DATA_CHUNK);
381    } else {
382      Chunk c = this.chunkCreator.getJumboChunk(size);
383      chunks.add(c.getId());
384      return c;
385    }
386  }
387
388  @Override
389  public boolean isOnHeap() {
390    return !isOffHeap();
391  }
392
393  @Override
394  public boolean isOffHeap() {
395    return this.chunkCreator.isOffheap();
396  }
397
398  Chunk getCurrentChunk() {
399    return currChunk.get();
400  }
401
402  BlockingQueue<Chunk> getPooledChunks() {
403    BlockingQueue<Chunk> pooledChunks = new LinkedBlockingQueue<>();
404    for (Integer id : this.chunks) {
405      Chunk chunk = chunkCreator.getChunk(id);
406      if (chunk != null && chunk.isFromPool()) {
407        pooledChunks.add(chunk);
408      }
409    }
410    return pooledChunks;
411  }
412
413  Integer getNumOfChunksReturnedToPool() {
414    int i = 0;
415    for (Integer id : this.chunks) {
416      if (chunkCreator.isChunkInPool(id)) {
417        i++;
418      }
419    }
420    return i;
421  }
422}