001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import java.nio.ByteBuffer; 022import java.util.Set; 023import java.util.concurrent.BlockingQueue; 024import java.util.concurrent.ConcurrentSkipListSet; 025import java.util.concurrent.LinkedBlockingQueue; 026import java.util.concurrent.atomic.AtomicBoolean; 027import java.util.concurrent.atomic.AtomicInteger; 028import java.util.concurrent.atomic.AtomicReference; 029import java.util.concurrent.locks.ReentrantLock; 030 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.ByteBufferExtendedCell; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.ExtendedCell; 035import org.apache.hadoop.hbase.KeyValueUtil; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 040import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 041/** 042 * A memstore-local allocation buffer. 043 * <p> 044 * The MemStoreLAB is basically a bump-the-pointer allocator that allocates 045 * big (2MB) byte[] chunks from and then doles it out to threads that request 046 * slices into the array. 047 * <p> 048 * The purpose of this class is to combat heap fragmentation in the 049 * regionserver. By ensuring that all Cells in a given memstore refer 050 * only to large chunks of contiguous memory, we ensure that large blocks 051 * get freed up when the memstore is flushed. 052 * <p> 053 * Without the MSLAB, the byte array allocated during insertion end up 054 * interleaved throughout the heap, and the old generation gets progressively 055 * more fragmented until a stop-the-world compacting collection occurs. 056 * <p> 057 * TODO: we should probably benchmark whether word-aligning the allocations 058 * would provide a performance improvement - probably would speed up the 059 * Bytes.toLong/Bytes.toInt calls in KeyValue, but some of those are cached 060 * anyway. 061 * The chunks created by this MemStoreLAB can get pooled at {@link ChunkCreator}. 062 * When the Chunk comes from pool, it can be either an on heap or an off heap backed chunk. The chunks, 063 * which this MemStoreLAB creates on its own (when no chunk available from pool), those will be 064 * always on heap backed. 065 */ 066@InterfaceAudience.Private 067public class MemStoreLABImpl implements MemStoreLAB { 068 069 static final Logger LOG = LoggerFactory.getLogger(MemStoreLABImpl.class); 070 071 private AtomicReference<Chunk> currChunk = new AtomicReference<>(); 072 // Lock to manage multiple handlers requesting for a chunk 073 private ReentrantLock lock = new ReentrantLock(); 074 075 // A set of chunks contained by this memstore LAB 076 @VisibleForTesting 077 Set<Integer> chunks = new ConcurrentSkipListSet<Integer>(); 078 private final int dataChunkSize; 079 private final int maxAlloc; 080 private final ChunkCreator chunkCreator; 081 private final CompactingMemStore.IndexType idxType; // what index is used for corresponding segment 082 083 // This flag is for closing this instance, its set when clearing snapshot of 084 // memstore 085 private volatile boolean closed = false; 086 // This flag is for reclaiming chunks. Its set when putting chunks back to 087 // pool 088 private AtomicBoolean reclaimed = new AtomicBoolean(false); 089 // Current count of open scanners which reading data from this MemStoreLAB 090 private final AtomicInteger openScannerCount = new AtomicInteger(); 091 092 // Used in testing 093 public MemStoreLABImpl() { 094 this(new Configuration()); 095 } 096 097 public MemStoreLABImpl(Configuration conf) { 098 dataChunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT); 099 maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT); 100 this.chunkCreator = ChunkCreator.getInstance(); 101 // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one! 102 Preconditions.checkArgument(maxAlloc <= dataChunkSize, 103 MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY); 104 105 // if user requested to work with MSLABs (whether on- or off-heap), then the 106 // immutable segments are going to use CellChunkMap as their index 107 idxType = CompactingMemStore.IndexType.CHUNK_MAP; 108 } 109 110 @Override 111 public Cell copyCellInto(Cell cell) { 112 // See head of copyBBECellInto for how it differs from copyCellInto 113 return (cell instanceof ByteBufferExtendedCell)? 114 copyBBECellInto((ByteBufferExtendedCell)cell, maxAlloc): 115 copyCellInto(cell, maxAlloc); 116 } 117 118 /** 119 * When a cell's size is too big (bigger than maxAlloc), 120 * copyCellInto does not allocate it on MSLAB. 121 * Since the process of flattening to CellChunkMap assumes that 122 * all cells are allocated on MSLAB, during this process, 123 * the big cells are copied into MSLAB using this method. 124 */ 125 @Override 126 public Cell forceCopyOfBigCellInto(Cell cell) { 127 int size = Segment.getCellLength(cell); 128 size += ChunkCreator.SIZEOF_CHUNK_HEADER; 129 Preconditions.checkArgument(size >= 0, "negative size"); 130 if (size <= dataChunkSize) { 131 // Using copyCellInto for cells which are bigger than the original maxAlloc 132 return copyCellInto(cell, dataChunkSize); 133 } else { 134 Chunk c = getNewExternalChunk(size); 135 int allocOffset = c.alloc(size); 136 return copyToChunkCell(cell, c.getData(), allocOffset, size); 137 } 138 } 139 140 /** 141 * Mostly a duplicate of {@link #copyCellInto(Cell, int)}} done for perf sake. It presumes 142 * ByteBufferExtendedCell instead of Cell so we deal with a specific type rather than the 143 * super generic Cell. Removes instanceof checks. Shrinkage is enough to make this inline where 144 * before it was too big. Uses less CPU. See HBASE-20875 for evidence. 145 * @see #copyCellInto(Cell, int) 146 */ 147 private Cell copyBBECellInto(ByteBufferExtendedCell cell, int maxAlloc) { 148 int size = cell.getSerializedSize(); 149 Preconditions.checkArgument(size >= 0, "negative size"); 150 // Callers should satisfy large allocations from JVM heap so limit fragmentation. 151 if (size > maxAlloc) { 152 return null; 153 } 154 Chunk c = null; 155 int allocOffset = 0; 156 while (true) { 157 // Try to get the chunk 158 c = getOrMakeChunk(); 159 // We may get null because the some other thread succeeded in getting the lock 160 // and so the current thread has to try again to make its chunk or grab the chunk 161 // that the other thread created 162 // Try to allocate from this chunk 163 if (c != null) { 164 allocOffset = c.alloc(size); 165 if (allocOffset != -1) { 166 // We succeeded - this is the common case - small alloc 167 // from a big buffer 168 break; 169 } 170 // not enough space! 171 // try to retire this chunk 172 tryRetireChunk(c); 173 } 174 } 175 return copyBBECToChunkCell(cell, c.getData(), allocOffset, size); 176 } 177 178 /** 179 * @see #copyBBECellInto(ByteBufferExtendedCell, int) 180 */ 181 private Cell copyCellInto(Cell cell, int maxAlloc) { 182 int size = Segment.getCellLength(cell); 183 Preconditions.checkArgument(size >= 0, "negative size"); 184 // Callers should satisfy large allocations directly from JVM since they 185 // don't cause fragmentation as badly. 186 if (size > maxAlloc) { 187 return null; 188 } 189 Chunk c = null; 190 int allocOffset = 0; 191 while (true) { 192 // Try to get the chunk 193 c = getOrMakeChunk(); 194 // we may get null because the some other thread succeeded in getting the lock 195 // and so the current thread has to try again to make its chunk or grab the chunk 196 // that the other thread created 197 // Try to allocate from this chunk 198 if (c != null) { 199 allocOffset = c.alloc(size); 200 if (allocOffset != -1) { 201 // We succeeded - this is the common case - small alloc 202 // from a big buffer 203 break; 204 } 205 // not enough space! 206 // try to retire this chunk 207 tryRetireChunk(c); 208 } 209 } 210 return copyToChunkCell(cell, c.getData(), allocOffset, size); 211 } 212 213 /** 214 * Clone the passed cell by copying its data into the passed buf and create a cell with a chunkid 215 * out of it 216 * @see #copyBBECToChunkCell(ByteBufferExtendedCell, ByteBuffer, int, int) 217 */ 218 private static Cell copyToChunkCell(Cell cell, ByteBuffer buf, int offset, int len) { 219 int tagsLen = cell.getTagsLength(); 220 if (cell instanceof ExtendedCell) { 221 ((ExtendedCell) cell).write(buf, offset); 222 } else { 223 // Normally all Cell impls within Server will be of type ExtendedCell. Just considering the 224 // other case also. The data fragments within Cell is copied into buf as in KeyValue 225 // serialization format only. 226 KeyValueUtil.appendTo(cell, buf, offset, true); 227 } 228 return createChunkCell(buf, offset, len, tagsLen, cell.getSequenceId()); 229 } 230 231 /** 232 * Clone the passed cell by copying its data into the passed buf and create a cell with a chunkid 233 * out of it 234 * @see #copyToChunkCell(Cell, ByteBuffer, int, int) 235 */ 236 private static Cell copyBBECToChunkCell(ByteBufferExtendedCell cell, ByteBuffer buf, int offset, 237 int len) { 238 int tagsLen = cell.getTagsLength(); 239 cell.write(buf, offset); 240 return createChunkCell(buf, offset, len, tagsLen, cell.getSequenceId()); 241 } 242 243 private static Cell createChunkCell(ByteBuffer buf, int offset, int len, int tagsLen, 244 long sequenceId) { 245 // TODO : write the seqid here. For writing seqId we should create a new cell type so 246 // that seqId is not used as the state 247 if (tagsLen == 0) { 248 // When tagsLen is 0, make a NoTagsByteBufferKeyValue version. This is an optimized class 249 // which directly return tagsLen as 0. So we avoid parsing many length components in 250 // reading the tagLength stored in the backing buffer. The Memstore addition of every Cell 251 // call getTagsLength(). 252 return new NoTagByteBufferChunkKeyValue(buf, offset, len, sequenceId); 253 } else { 254 return new ByteBufferChunkKeyValue(buf, offset, len, sequenceId); 255 } 256 } 257 258 /** 259 * Close this instance since it won't be used any more, try to put the chunks 260 * back to pool 261 */ 262 @Override 263 public void close() { 264 this.closed = true; 265 // We could put back the chunks to pool for reusing only when there is no 266 // opening scanner which will read their data 267 int count = openScannerCount.get(); 268 if(count == 0) { 269 recycleChunks(); 270 } 271 } 272 273 @VisibleForTesting 274 int getOpenScannerCount() { 275 return this.openScannerCount.get(); 276 } 277 278 /** 279 * Called when opening a scanner on the data of this MemStoreLAB 280 */ 281 @Override 282 public void incScannerCount() { 283 this.openScannerCount.incrementAndGet(); 284 } 285 286 /** 287 * Called when closing a scanner on the data of this MemStoreLAB 288 */ 289 @Override 290 public void decScannerCount() { 291 int count = this.openScannerCount.decrementAndGet(); 292 if (this.closed && count == 0) { 293 recycleChunks(); 294 } 295 } 296 297 private void recycleChunks() { 298 if (reclaimed.compareAndSet(false, true)) { 299 chunkCreator.putbackChunks(chunks); 300 } 301 } 302 303 /** 304 * Try to retire the current chunk if it is still 305 * <code>c</code>. Postcondition is that curChunk.get() 306 * != c 307 * @param c the chunk to retire 308 */ 309 private void tryRetireChunk(Chunk c) { 310 currChunk.compareAndSet(c, null); 311 // If the CAS succeeds, that means that we won the race 312 // to retire the chunk. We could use this opportunity to 313 // update metrics on external fragmentation. 314 // 315 // If the CAS fails, that means that someone else already 316 // retired the chunk for us. 317 } 318 319 /** 320 * Get the current chunk, or, if there is no current chunk, 321 * allocate a new one from the JVM. 322 */ 323 private Chunk getOrMakeChunk() { 324 // Try to get the chunk 325 Chunk c = currChunk.get(); 326 if (c != null) { 327 return c; 328 } 329 // No current chunk, so we want to allocate one. We race 330 // against other allocators to CAS in an uninitialized chunk 331 // (which is cheap to allocate) 332 if (lock.tryLock()) { 333 try { 334 // once again check inside the lock 335 c = currChunk.get(); 336 if (c != null) { 337 return c; 338 } 339 c = this.chunkCreator.getChunk(idxType); 340 if (c != null) { 341 // set the curChunk. No need of CAS as only one thread will be here 342 currChunk.set(c); 343 chunks.add(c.getId()); 344 return c; 345 } 346 } finally { 347 lock.unlock(); 348 } 349 } 350 return null; 351 } 352 353 /* Returning a new pool chunk, without replacing current chunk, 354 ** meaning MSLABImpl does not make the returned chunk as CurChunk. 355 ** The space on this chunk will be allocated externally. 356 ** The interface is only for external callers. 357 */ 358 @Override 359 public Chunk getNewExternalChunk(ChunkCreator.ChunkType chunkType) { 360 switch (chunkType) { 361 case INDEX_CHUNK: 362 case DATA_CHUNK: 363 Chunk c = this.chunkCreator.getChunk(chunkType); 364 chunks.add(c.getId()); 365 return c; 366 case JUMBO_CHUNK: // a jumbo chunk doesn't have a fixed size 367 default: 368 return null; 369 } 370 } 371 372 /* Returning a new chunk, without replacing current chunk, 373 ** meaning MSLABImpl does not make the returned chunk as CurChunk. 374 ** The space on this chunk will be allocated externally. 375 ** The interface is only for external callers. 376 ** Chunks from pools are not allocated from here, since they have fixed sizes 377 */ 378 @Override 379 public Chunk getNewExternalChunk(int size) { 380 int allocSize = size + ChunkCreator.getInstance().SIZEOF_CHUNK_HEADER; 381 if (allocSize <= ChunkCreator.getInstance().getChunkSize()) { 382 return getNewExternalChunk(ChunkCreator.ChunkType.DATA_CHUNK); 383 } else { 384 Chunk c = this.chunkCreator.getJumboChunk(size); 385 chunks.add(c.getId()); 386 return c; 387 } 388 } 389 390 @Override 391 public boolean isOnHeap() { 392 return !isOffHeap(); 393 } 394 395 @Override 396 public boolean isOffHeap() { 397 return this.chunkCreator.isOffheap(); 398 } 399 400 @VisibleForTesting 401 Chunk getCurrentChunk() { 402 return currChunk.get(); 403 } 404 405 @VisibleForTesting 406 BlockingQueue<Chunk> getPooledChunks() { 407 BlockingQueue<Chunk> pooledChunks = new LinkedBlockingQueue<>(); 408 for (Integer id : this.chunks) { 409 Chunk chunk = chunkCreator.getChunk(id); 410 if (chunk != null && chunk.isFromPool()) { 411 pooledChunks.add(chunk); 412 } 413 } 414 return pooledChunks; 415 } 416 417 @VisibleForTesting Integer getNumOfChunksReturnedToPool() { 418 int i = 0; 419 for (Integer id : this.chunks) { 420 if (chunkCreator.isChunkInPool(id)) { 421 i++; 422 } 423 } 424 return i; 425 } 426}