001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.nio.ByteBuffer;
022import java.util.Set;
023import java.util.concurrent.BlockingQueue;
024import java.util.concurrent.ConcurrentSkipListSet;
025import java.util.concurrent.LinkedBlockingQueue;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.concurrent.atomic.AtomicInteger;
028import java.util.concurrent.atomic.AtomicReference;
029import java.util.concurrent.locks.ReentrantLock;
030
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.ByteBufferExtendedCell;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.ExtendedCell;
035import org.apache.hadoop.hbase.KeyValueUtil;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
040import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
041/**
042 * A memstore-local allocation buffer.
043 * <p>
044 * The MemStoreLAB is basically a bump-the-pointer allocator that allocates
045 * big (2MB) byte[] chunks from and then doles it out to threads that request
046 * slices into the array.
047 * <p>
048 * The purpose of this class is to combat heap fragmentation in the
049 * regionserver. By ensuring that all Cells in a given memstore refer
050 * only to large chunks of contiguous memory, we ensure that large blocks
051 * get freed up when the memstore is flushed.
052 * <p>
053 * Without the MSLAB, the byte array allocated during insertion end up
054 * interleaved throughout the heap, and the old generation gets progressively
055 * more fragmented until a stop-the-world compacting collection occurs.
056 * <p>
057 * TODO: we should probably benchmark whether word-aligning the allocations
058 * would provide a performance improvement - probably would speed up the
059 * Bytes.toLong/Bytes.toInt calls in KeyValue, but some of those are cached
060 * anyway.
061 * The chunks created by this MemStoreLAB can get pooled at {@link ChunkCreator}.
062 * When the Chunk comes from pool, it can be either an on heap or an off heap backed chunk. The chunks,
063 * which this MemStoreLAB creates on its own (when no chunk available from pool), those will be
064 * always on heap backed.
065 */
066@InterfaceAudience.Private
067public class MemStoreLABImpl implements MemStoreLAB {
068
069  static final Logger LOG = LoggerFactory.getLogger(MemStoreLABImpl.class);
070
071  private AtomicReference<Chunk> currChunk = new AtomicReference<>();
072  // Lock to manage multiple handlers requesting for a chunk
073  private ReentrantLock lock = new ReentrantLock();
074
075  // A set of chunks contained by this memstore LAB
076  @VisibleForTesting
077  Set<Integer> chunks = new ConcurrentSkipListSet<Integer>();
078  private final int dataChunkSize;
079  private final int maxAlloc;
080  private final ChunkCreator chunkCreator;
081  private final CompactingMemStore.IndexType idxType; // what index is used for corresponding segment
082
083  // This flag is for closing this instance, its set when clearing snapshot of
084  // memstore
085  private volatile boolean closed = false;
086  // This flag is for reclaiming chunks. Its set when putting chunks back to
087  // pool
088  private AtomicBoolean reclaimed = new AtomicBoolean(false);
089  // Current count of open scanners which reading data from this MemStoreLAB
090  private final AtomicInteger openScannerCount = new AtomicInteger();
091
092  // Used in testing
093  public MemStoreLABImpl() {
094    this(new Configuration());
095  }
096
097  public MemStoreLABImpl(Configuration conf) {
098    dataChunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT);
099    maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT);
100    this.chunkCreator = ChunkCreator.getInstance();
101    // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
102    Preconditions.checkArgument(maxAlloc <= dataChunkSize,
103        MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY);
104
105    // if user requested to work with MSLABs (whether on- or off-heap), then the
106    // immutable segments are going to use CellChunkMap as their index
107    idxType = CompactingMemStore.IndexType.CHUNK_MAP;
108  }
109
110  @Override
111  public Cell copyCellInto(Cell cell) {
112    // See head of copyBBECellInto for how it differs from copyCellInto
113    return (cell instanceof ByteBufferExtendedCell)?
114        copyBBECellInto((ByteBufferExtendedCell)cell, maxAlloc):
115        copyCellInto(cell, maxAlloc);
116  }
117
118  /**
119   * When a cell's size is too big (bigger than maxAlloc),
120   * copyCellInto does not allocate it on MSLAB.
121   * Since the process of flattening to CellChunkMap assumes that
122   * all cells are allocated on MSLAB, during this process,
123   * the big cells are copied into MSLAB using this method.
124   */
125  @Override
126  public Cell forceCopyOfBigCellInto(Cell cell) {
127    int size = cell instanceof ExtendedCell? ((ExtendedCell)cell).getSerializedSize():
128        KeyValueUtil.length(cell);
129    size += ChunkCreator.SIZEOF_CHUNK_HEADER;
130    Preconditions.checkArgument(size >= 0, "negative size");
131    if (size <= dataChunkSize) {
132      // Using copyCellInto for cells which are bigger than the original maxAlloc
133      return copyCellInto(cell, dataChunkSize);
134    } else {
135      Chunk c = getNewExternalChunk(size);
136      int allocOffset = c.alloc(size);
137      return copyToChunkCell(cell, c.getData(), allocOffset, size);
138    }
139  }
140
141  /**
142   * Mostly a duplicate of {@link #copyCellInto(Cell, int)}} done for perf sake. It presumes
143   * ByteBufferExtendedCell instead of Cell so we deal with a specific type rather than the
144   * super generic Cell. Removes instanceof checks. Shrinkage is enough to make this inline where
145   * before it was too big. Uses less CPU. See HBASE-20875 for evidence.
146   * @see #copyCellInto(Cell, int)
147   */
148  private Cell copyBBECellInto(ByteBufferExtendedCell cell, int maxAlloc) {
149    int size = cell.getSerializedSize();
150    Preconditions.checkArgument(size >= 0, "negative size");
151    // Callers should satisfy large allocations from JVM heap so limit fragmentation.
152    if (size > maxAlloc) {
153      return null;
154    }
155    Chunk c = null;
156    int allocOffset = 0;
157    while (true) {
158      // Try to get the chunk
159      c = getOrMakeChunk();
160      // We may get null because the some other thread succeeded in getting the lock
161      // and so the current thread has to try again to make its chunk or grab the chunk
162      // that the other thread created
163      // Try to allocate from this chunk
164      if (c != null) {
165        allocOffset = c.alloc(size);
166        if (allocOffset != -1) {
167          // We succeeded - this is the common case - small alloc
168          // from a big buffer
169          break;
170        }
171        // not enough space!
172        // try to retire this chunk
173        tryRetireChunk(c);
174      }
175    }
176    return copyBBECToChunkCell(cell, c.getData(), allocOffset, size);
177  }
178
179  /**
180   * @see #copyBBECellInto(ByteBufferExtendedCell, int)
181   */
182  private Cell copyCellInto(Cell cell, int maxAlloc) {
183    int size = cell instanceof ExtendedCell? ((ExtendedCell)cell).getSerializedSize():
184        KeyValueUtil.length(cell);
185    Preconditions.checkArgument(size >= 0, "negative size");
186    // Callers should satisfy large allocations directly from JVM since they
187    // don't cause fragmentation as badly.
188    if (size > maxAlloc) {
189      return null;
190    }
191    Chunk c = null;
192    int allocOffset = 0;
193    while (true) {
194      // Try to get the chunk
195      c = getOrMakeChunk();
196      // we may get null because the some other thread succeeded in getting the lock
197      // and so the current thread has to try again to make its chunk or grab the chunk
198      // that the other thread created
199      // Try to allocate from this chunk
200      if (c != null) {
201        allocOffset = c.alloc(size);
202        if (allocOffset != -1) {
203          // We succeeded - this is the common case - small alloc
204          // from a big buffer
205          break;
206        }
207        // not enough space!
208        // try to retire this chunk
209        tryRetireChunk(c);
210      }
211    }
212    return copyToChunkCell(cell, c.getData(), allocOffset, size);
213  }
214
215  /**
216   * Clone the passed cell by copying its data into the passed buf and create a cell with a chunkid
217   * out of it
218   * @see #copyBBECToChunkCell(ByteBufferExtendedCell, ByteBuffer, int, int)
219   */
220  private static Cell copyToChunkCell(Cell cell, ByteBuffer buf, int offset, int len) {
221    int tagsLen = cell.getTagsLength();
222    if (cell instanceof ExtendedCell) {
223      ((ExtendedCell) cell).write(buf, offset);
224    } else {
225      // Normally all Cell impls within Server will be of type ExtendedCell. Just considering the
226      // other case also. The data fragments within Cell is copied into buf as in KeyValue
227      // serialization format only.
228      KeyValueUtil.appendTo(cell, buf, offset, true);
229    }
230    return createChunkCell(buf, offset, len, tagsLen, cell.getSequenceId());
231  }
232
233  /**
234   * Clone the passed cell by copying its data into the passed buf and create a cell with a chunkid
235   * out of it
236   * @see #copyToChunkCell(Cell, ByteBuffer, int, int)
237   */
238  private static Cell copyBBECToChunkCell(ByteBufferExtendedCell cell, ByteBuffer buf, int offset,
239      int len) {
240    int tagsLen = cell.getTagsLength();
241    cell.write(buf, offset);
242    return createChunkCell(buf, offset, len, tagsLen, cell.getSequenceId());
243  }
244
245  private static Cell createChunkCell(ByteBuffer buf, int offset, int len, int tagsLen,
246      long sequenceId) {
247    // TODO : write the seqid here. For writing seqId we should create a new cell type so
248    // that seqId is not used as the state
249    if (tagsLen == 0) {
250      // When tagsLen is 0, make a NoTagsByteBufferKeyValue version. This is an optimized class
251      // which directly return tagsLen as 0. So we avoid parsing many length components in
252      // reading the tagLength stored in the backing buffer. The Memstore addition of every Cell
253      // call getTagsLength().
254      return new NoTagByteBufferChunkKeyValue(buf, offset, len, sequenceId);
255    } else {
256      return new ByteBufferChunkKeyValue(buf, offset, len, sequenceId);
257    }
258  }
259
260  /**
261   * Close this instance since it won't be used any more, try to put the chunks
262   * back to pool
263   */
264  @Override
265  public void close() {
266    this.closed = true;
267    // We could put back the chunks to pool for reusing only when there is no
268    // opening scanner which will read their data
269    int count  = openScannerCount.get();
270    if(count == 0) {
271      recycleChunks();
272    }
273  }
274
275  @VisibleForTesting
276  int getOpenScannerCount() {
277    return this.openScannerCount.get();
278  }
279
280  /**
281   * Called when opening a scanner on the data of this MemStoreLAB
282   */
283  @Override
284  public void incScannerCount() {
285    this.openScannerCount.incrementAndGet();
286  }
287
288  /**
289   * Called when closing a scanner on the data of this MemStoreLAB
290   */
291  @Override
292  public void decScannerCount() {
293    int count = this.openScannerCount.decrementAndGet();
294    if (this.closed && count == 0) {
295      recycleChunks();
296    }
297  }
298
299  private void recycleChunks() {
300    if (reclaimed.compareAndSet(false, true)) {
301      chunkCreator.putbackChunks(chunks);
302    }
303  }
304
305  /**
306   * Try to retire the current chunk if it is still
307   * <code>c</code>. Postcondition is that curChunk.get()
308   * != c
309   * @param c the chunk to retire
310   */
311  private void tryRetireChunk(Chunk c) {
312    currChunk.compareAndSet(c, null);
313    // If the CAS succeeds, that means that we won the race
314    // to retire the chunk. We could use this opportunity to
315    // update metrics on external fragmentation.
316    //
317    // If the CAS fails, that means that someone else already
318    // retired the chunk for us.
319  }
320
321  /**
322   * Get the current chunk, or, if there is no current chunk,
323   * allocate a new one from the JVM.
324   */
325  private Chunk getOrMakeChunk() {
326    // Try to get the chunk
327    Chunk c = currChunk.get();
328    if (c != null) {
329      return c;
330    }
331    // No current chunk, so we want to allocate one. We race
332    // against other allocators to CAS in an uninitialized chunk
333    // (which is cheap to allocate)
334    if (lock.tryLock()) {
335      try {
336        // once again check inside the lock
337        c = currChunk.get();
338        if (c != null) {
339          return c;
340        }
341        c = this.chunkCreator.getChunk(idxType);
342        if (c != null) {
343          // set the curChunk. No need of CAS as only one thread will be here
344          currChunk.set(c);
345          chunks.add(c.getId());
346          return c;
347        }
348      } finally {
349        lock.unlock();
350      }
351    }
352    return null;
353  }
354
355  /* Returning a new pool chunk, without replacing current chunk,
356  ** meaning MSLABImpl does not make the returned chunk as CurChunk.
357  ** The space on this chunk will be allocated externally.
358  ** The interface is only for external callers.
359  */
360  @Override
361  public Chunk getNewExternalChunk(ChunkCreator.ChunkType chunkType) {
362    switch (chunkType) {
363      case INDEX_CHUNK:
364      case DATA_CHUNK:
365        Chunk c = this.chunkCreator.getChunk(chunkType);
366        chunks.add(c.getId());
367        return c;
368      case JUMBO_CHUNK: // a jumbo chunk doesn't have a fixed size
369      default:
370        return null;
371    }
372  }
373
374  /* Returning a new chunk, without replacing current chunk,
375  ** meaning MSLABImpl does not make the returned chunk as CurChunk.
376  ** The space on this chunk will be allocated externally.
377  ** The interface is only for external callers.
378  ** Chunks from pools are not allocated from here, since they have fixed sizes
379  */
380  @Override
381  public Chunk getNewExternalChunk(int size) {
382    int allocSize = size + ChunkCreator.getInstance().SIZEOF_CHUNK_HEADER;
383    if (allocSize <= ChunkCreator.getInstance().getChunkSize()) {
384      return getNewExternalChunk(ChunkCreator.ChunkType.DATA_CHUNK);
385    } else {
386      Chunk c = this.chunkCreator.getJumboChunk(size);
387      chunks.add(c.getId());
388      return c;
389    }
390  }
391
392  @Override
393  public boolean isOnHeap() {
394    return !isOffHeap();
395  }
396
397  @Override
398  public boolean isOffHeap() {
399    return this.chunkCreator.isOffheap();
400  }
401
402  @VisibleForTesting
403  Chunk getCurrentChunk() {
404    return currChunk.get();
405  }
406
407  @VisibleForTesting
408  BlockingQueue<Chunk> getPooledChunks() {
409    BlockingQueue<Chunk> pooledChunks = new LinkedBlockingQueue<>();
410    for (Integer id : this.chunks) {
411      Chunk chunk = chunkCreator.getChunk(id);
412      if (chunk != null && chunk.isFromPool()) {
413        pooledChunks.add(chunk);
414      }
415    }
416    return pooledChunks;
417  }
418
419  @VisibleForTesting Integer getNumOfChunksReturnedToPool() {
420    int i = 0;
421    for (Integer id : this.chunks) {
422      if (chunkCreator.isChunkInPool(id)) {
423        i++;
424      }
425    }
426    return i;
427  }
428}