001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.nio.ByteBuffer;
022import java.util.Set;
023import java.util.concurrent.BlockingQueue;
024import java.util.concurrent.ConcurrentSkipListSet;
025import java.util.concurrent.LinkedBlockingQueue;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.concurrent.atomic.AtomicReference;
028import java.util.concurrent.locks.ReentrantLock;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.ByteBufferExtendedCell;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.ExtendedCell;
033import org.apache.hadoop.hbase.KeyValueUtil;
034import org.apache.hadoop.hbase.nio.RefCnt;
035import org.apache.hadoop.hbase.regionserver.CompactingMemStore.IndexType;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
041
042/**
043 * A memstore-local allocation buffer.
044 * <p>
045 * The MemStoreLAB is basically a bump-the-pointer allocator that allocates big (2MB) byte[] chunks
046 * from and then doles it out to threads that request slices into the array.
047 * <p>
048 * The purpose of this class is to combat heap fragmentation in the regionserver. By ensuring that
049 * all Cells in a given memstore refer only to large chunks of contiguous memory, we ensure that
050 * large blocks get freed up when the memstore is flushed.
051 * <p>
052 * Without the MSLAB, the byte array allocated during insertion end up interleaved throughout the
053 * heap, and the old generation gets progressively more fragmented until a stop-the-world compacting
054 * collection occurs.
055 * <p>
056 * TODO: we should probably benchmark whether word-aligning the allocations would provide a
057 * performance improvement - probably would speed up the Bytes.toLong/Bytes.toInt calls in KeyValue,
058 * but some of those are cached anyway. The chunks created by this MemStoreLAB can get pooled at
059 * {@link ChunkCreator}. When the Chunk comes from pool, it can be either an on heap or an off heap
060 * backed chunk. The chunks, which this MemStoreLAB creates on its own (when no chunk available from
061 * pool), those will be always on heap backed.
062 * <p>
063 * NOTE:if user requested to work with MSLABs (whether on- or off-heap), in
064 * {@link CompactingMemStore} ctor, the {@link CompactingMemStore#indexType} could only be
065 * {@link IndexType#CHUNK_MAP},that is to say the immutable segments using MSLABs are going to use
066 * {@link CellChunkMap} as their index.
067 */
068@InterfaceAudience.Private
069public class MemStoreLABImpl implements MemStoreLAB {
070
071  static final Logger LOG = LoggerFactory.getLogger(MemStoreLABImpl.class);
072
073  private AtomicReference<Chunk> currChunk = new AtomicReference<>();
074  // Lock to manage multiple handlers requesting for a chunk
075  private ReentrantLock lock = new ReentrantLock();
076
077  // A set of chunks contained by this memstore LAB
078  Set<Integer> chunks = new ConcurrentSkipListSet<Integer>();
079  private final int dataChunkSize;
080  private final int maxAlloc;
081  private final ChunkCreator chunkCreator;
082
083  // This flag is for closing this instance, its set when clearing snapshot of
084  // memstore
085  private final AtomicBoolean closed = new AtomicBoolean(false);;
086  // This flag is for reclaiming chunks. Its set when putting chunks back to
087  // pool
088  private final AtomicBoolean reclaimed = new AtomicBoolean(false);
089  /**
090   * Its initial value is 1, so it is one bigger than the current count of open scanners which
091   * reading data from this MemStoreLAB.
092   */
093  private final RefCnt refCnt;
094
095  // Used in testing
096  public MemStoreLABImpl() {
097    this(new Configuration());
098  }
099
100  public MemStoreLABImpl(Configuration conf) {
101    dataChunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT);
102    maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT);
103    this.chunkCreator = ChunkCreator.getInstance();
104    // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
105    Preconditions.checkArgument(maxAlloc <= dataChunkSize,
106      MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY);
107
108    this.refCnt = RefCnt.create(() -> {
109      recycleChunks();
110    });
111
112  }
113
114  @Override
115  public Cell copyCellInto(Cell cell) {
116    // See head of copyBBECellInto for how it differs from copyCellInto
117    return (cell instanceof ByteBufferExtendedCell)
118      ? copyBBECellInto((ByteBufferExtendedCell) cell, maxAlloc)
119      : copyCellInto(cell, maxAlloc);
120  }
121
122  /**
123   * When a cell's size is too big (bigger than maxAlloc), copyCellInto does not allocate it on
124   * MSLAB. Since the process of flattening to CellChunkMap assumes that all cells are allocated on
125   * MSLAB, during this process, the big cells are copied into MSLAB using this method.
126   */
127  @Override
128  public Cell forceCopyOfBigCellInto(Cell cell) {
129    int size = Segment.getCellLength(cell);
130    Preconditions.checkArgument(size >= 0, "negative size");
131    if (size + ChunkCreator.SIZEOF_CHUNK_HEADER <= dataChunkSize) {
132      // Using copyCellInto for cells which are bigger than the original maxAlloc
133      return copyCellInto(cell, dataChunkSize);
134    } else {
135      Chunk c = getNewExternalChunk(size);
136      int allocOffset = c.alloc(size);
137      return copyToChunkCell(cell, c.getData(), allocOffset, size);
138    }
139  }
140
141  /**
142   * Mostly a duplicate of {@link #copyCellInto(Cell, int)}} done for perf sake. It presumes
143   * ByteBufferExtendedCell instead of Cell so we deal with a specific type rather than the super
144   * generic Cell. Removes instanceof checks. Shrinkage is enough to make this inline where before
145   * it was too big. Uses less CPU. See HBASE-20875 for evidence.
146   * @see #copyCellInto(Cell, int)
147   */
148  private Cell copyBBECellInto(ByteBufferExtendedCell cell, int maxAlloc) {
149    int size = cell.getSerializedSize();
150    Preconditions.checkArgument(size >= 0, "negative size");
151    // Callers should satisfy large allocations from JVM heap so limit fragmentation.
152    if (size > maxAlloc) {
153      return null;
154    }
155    Chunk c = null;
156    int allocOffset = 0;
157    while (true) {
158      // Try to get the chunk
159      c = getOrMakeChunk();
160      // We may get null because the some other thread succeeded in getting the lock
161      // and so the current thread has to try again to make its chunk or grab the chunk
162      // that the other thread created
163      // Try to allocate from this chunk
164      if (c != null) {
165        allocOffset = c.alloc(size);
166        if (allocOffset != -1) {
167          // We succeeded - this is the common case - small alloc
168          // from a big buffer
169          break;
170        }
171        // not enough space!
172        // try to retire this chunk
173        tryRetireChunk(c);
174      }
175    }
176    return copyBBECToChunkCell(cell, c.getData(), allocOffset, size);
177  }
178
179  /**
180   * @see #copyBBECellInto(ByteBufferExtendedCell, int)
181   */
182  private Cell copyCellInto(Cell cell, int maxAlloc) {
183    int size = Segment.getCellLength(cell);
184    Preconditions.checkArgument(size >= 0, "negative size");
185    // Callers should satisfy large allocations directly from JVM since they
186    // don't cause fragmentation as badly.
187    if (size > maxAlloc) {
188      return null;
189    }
190    Chunk c = null;
191    int allocOffset = 0;
192    while (true) {
193      // Try to get the chunk
194      c = getOrMakeChunk();
195      // we may get null because the some other thread succeeded in getting the lock
196      // and so the current thread has to try again to make its chunk or grab the chunk
197      // that the other thread created
198      // Try to allocate from this chunk
199      if (c != null) {
200        allocOffset = c.alloc(size);
201        if (allocOffset != -1) {
202          // We succeeded - this is the common case - small alloc
203          // from a big buffer
204          break;
205        }
206        // not enough space!
207        // try to retire this chunk
208        tryRetireChunk(c);
209      }
210    }
211    return copyToChunkCell(cell, c.getData(), allocOffset, size);
212  }
213
214  /**
215   * Clone the passed cell by copying its data into the passed buf and create a cell with a chunkid
216   * out of it
217   * @see #copyBBECToChunkCell(ByteBufferExtendedCell, ByteBuffer, int, int)
218   */
219  private static Cell copyToChunkCell(Cell cell, ByteBuffer buf, int offset, int len) {
220    int tagsLen = cell.getTagsLength();
221    if (cell instanceof ExtendedCell) {
222      ((ExtendedCell) cell).write(buf, offset);
223    } else {
224      // Normally all Cell impls within Server will be of type ExtendedCell. Just considering the
225      // other case also. The data fragments within Cell is copied into buf as in KeyValue
226      // serialization format only.
227      KeyValueUtil.appendTo(cell, buf, offset, true);
228    }
229    return createChunkCell(buf, offset, len, tagsLen, cell.getSequenceId());
230  }
231
232  /**
233   * Clone the passed cell by copying its data into the passed buf and create a cell with a chunkid
234   * out of it
235   * @see #copyToChunkCell(Cell, ByteBuffer, int, int)
236   */
237  private static Cell copyBBECToChunkCell(ByteBufferExtendedCell cell, ByteBuffer buf, int offset,
238    int len) {
239    int tagsLen = cell.getTagsLength();
240    cell.write(buf, offset);
241    return createChunkCell(buf, offset, len, tagsLen, cell.getSequenceId());
242  }
243
244  private static Cell createChunkCell(ByteBuffer buf, int offset, int len, int tagsLen,
245    long sequenceId) {
246    // TODO : write the seqid here. For writing seqId we should create a new cell type so
247    // that seqId is not used as the state
248    if (tagsLen == 0) {
249      // When tagsLen is 0, make a NoTagsByteBufferKeyValue version. This is an optimized class
250      // which directly return tagsLen as 0. So we avoid parsing many length components in
251      // reading the tagLength stored in the backing buffer. The Memstore addition of every Cell
252      // call getTagsLength().
253      return new NoTagByteBufferChunkKeyValue(buf, offset, len, sequenceId);
254    } else {
255      return new ByteBufferChunkKeyValue(buf, offset, len, sequenceId);
256    }
257  }
258
259  /**
260   * Close this instance since it won't be used any more, try to put the chunks back to pool
261   */
262  @Override
263  public void close() {
264    if (!this.closed.compareAndSet(false, true)) {
265      return;
266    }
267    // We could put back the chunks to pool for reusing only when there is no
268    // opening scanner which will read their data
269    this.refCnt.release();
270  }
271
272  @RestrictedApi(explanation = "Should only be called in tests", link = "",
273      allowedOnPath = ".*/src/test/.*")
274  int getRefCntValue() {
275    return this.refCnt.refCnt();
276  }
277
278  /**
279   * Called when opening a scanner on the data of this MemStoreLAB
280   */
281  @Override
282  public void incScannerCount() {
283    this.refCnt.retain();
284  }
285
286  /**
287   * Called when closing a scanner on the data of this MemStoreLAB
288   */
289  @Override
290  public void decScannerCount() {
291    this.refCnt.release();
292  }
293
294  private void recycleChunks() {
295    if (reclaimed.compareAndSet(false, true)) {
296      chunkCreator.putbackChunks(chunks);
297      chunks.clear();
298    }
299  }
300
301  /**
302   * Try to retire the current chunk if it is still <code>c</code>. Postcondition is that
303   * curChunk.get() != c
304   * @param c the chunk to retire
305   */
306  private void tryRetireChunk(Chunk c) {
307    currChunk.compareAndSet(c, null);
308    // If the CAS succeeds, that means that we won the race
309    // to retire the chunk. We could use this opportunity to
310    // update metrics on external fragmentation.
311    //
312    // If the CAS fails, that means that someone else already
313    // retired the chunk for us.
314  }
315
316  /**
317   * Get the current chunk, or, if there is no current chunk, allocate a new one from the JVM.
318   */
319  private Chunk getOrMakeChunk() {
320    // Try to get the chunk
321    Chunk c = currChunk.get();
322    if (c != null) {
323      return c;
324    }
325    // No current chunk, so we want to allocate one. We race
326    // against other allocators to CAS in an uninitialized chunk
327    // (which is cheap to allocate)
328    if (lock.tryLock()) {
329      try {
330        // once again check inside the lock
331        c = currChunk.get();
332        if (c != null) {
333          return c;
334        }
335        c = this.chunkCreator.getChunk();
336        if (c != null) {
337          // set the curChunk. No need of CAS as only one thread will be here
338          currChunk.set(c);
339          chunks.add(c.getId());
340          return c;
341        }
342      } finally {
343        lock.unlock();
344      }
345    }
346    return null;
347  }
348
349  /*
350   * Returning a new pool chunk, without replacing current chunk, meaning MSLABImpl does not make
351   * the returned chunk as CurChunk. The space on this chunk will be allocated externally. The
352   * interface is only for external callers.
353   */
354  @Override
355  public Chunk getNewExternalChunk(ChunkCreator.ChunkType chunkType) {
356    switch (chunkType) {
357      case INDEX_CHUNK:
358      case DATA_CHUNK:
359        Chunk c = this.chunkCreator.getChunk(chunkType);
360        chunks.add(c.getId());
361        return c;
362      case JUMBO_CHUNK: // a jumbo chunk doesn't have a fixed size
363      default:
364        return null;
365    }
366  }
367
368  /*
369   * Returning a new chunk, without replacing current chunk, meaning MSLABImpl does not make the
370   * returned chunk as CurChunk. The space on this chunk will be allocated externally. The interface
371   * is only for external callers. Chunks from pools are not allocated from here, since they have
372   * fixed sizes
373   */
374  @Override
375  public Chunk getNewExternalChunk(int size) {
376    int allocSize = size + ChunkCreator.SIZEOF_CHUNK_HEADER;
377    if (allocSize <= ChunkCreator.getInstance().getChunkSize()) {
378      return getNewExternalChunk(ChunkCreator.ChunkType.DATA_CHUNK);
379    } else {
380      Chunk c = this.chunkCreator.getJumboChunk(size);
381      chunks.add(c.getId());
382      return c;
383    }
384  }
385
386  @Override
387  public boolean isOnHeap() {
388    return !isOffHeap();
389  }
390
391  @Override
392  public boolean isOffHeap() {
393    return this.chunkCreator.isOffheap();
394  }
395
396  Chunk getCurrentChunk() {
397    return currChunk.get();
398  }
399
400  BlockingQueue<Chunk> getPooledChunks() {
401    BlockingQueue<Chunk> pooledChunks = new LinkedBlockingQueue<>();
402    for (Integer id : this.chunks) {
403      Chunk chunk = chunkCreator.getChunk(id);
404      if (chunk != null && chunk.isFromPool()) {
405        pooledChunks.add(chunk);
406      }
407    }
408    return pooledChunks;
409  }
410
411  Integer getNumOfChunksReturnedToPool(Set<Integer> chunksId) {
412    int i = 0;
413    for (Integer id : chunksId) {
414      if (chunkCreator.isChunkInPool(id)) {
415        i++;
416      }
417    }
418    return i;
419  }
420
421  boolean isReclaimed() {
422    return reclaimed.get();
423  }
424
425  boolean isClosed() {
426    return closed.get();
427  }
428}