001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import java.nio.ByteBuffer; 022import java.util.Set; 023import java.util.concurrent.BlockingQueue; 024import java.util.concurrent.ConcurrentSkipListSet; 025import java.util.concurrent.LinkedBlockingQueue; 026import java.util.concurrent.atomic.AtomicBoolean; 027import java.util.concurrent.atomic.AtomicInteger; 028import java.util.concurrent.atomic.AtomicReference; 029import java.util.concurrent.locks.ReentrantLock; 030 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.ByteBufferExtendedCell; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.ExtendedCell; 035import org.apache.hadoop.hbase.KeyValueUtil; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 040import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 041/** 042 * A memstore-local allocation buffer. 043 * <p> 044 * The MemStoreLAB is basically a bump-the-pointer allocator that allocates 045 * big (2MB) byte[] chunks from and then doles it out to threads that request 046 * slices into the array. 047 * <p> 048 * The purpose of this class is to combat heap fragmentation in the 049 * regionserver. By ensuring that all Cells in a given memstore refer 050 * only to large chunks of contiguous memory, we ensure that large blocks 051 * get freed up when the memstore is flushed. 052 * <p> 053 * Without the MSLAB, the byte array allocated during insertion end up 054 * interleaved throughout the heap, and the old generation gets progressively 055 * more fragmented until a stop-the-world compacting collection occurs. 056 * <p> 057 * TODO: we should probably benchmark whether word-aligning the allocations 058 * would provide a performance improvement - probably would speed up the 059 * Bytes.toLong/Bytes.toInt calls in KeyValue, but some of those are cached 060 * anyway. 061 * The chunks created by this MemStoreLAB can get pooled at {@link ChunkCreator}. 062 * When the Chunk comes from pool, it can be either an on heap or an off heap backed chunk. The chunks, 063 * which this MemStoreLAB creates on its own (when no chunk available from pool), those will be 064 * always on heap backed. 065 */ 066@InterfaceAudience.Private 067public class MemStoreLABImpl implements MemStoreLAB { 068 069 static final Logger LOG = LoggerFactory.getLogger(MemStoreLABImpl.class); 070 071 private AtomicReference<Chunk> currChunk = new AtomicReference<>(); 072 // Lock to manage multiple handlers requesting for a chunk 073 private ReentrantLock lock = new ReentrantLock(); 074 075 // A set of chunks contained by this memstore LAB 076 @VisibleForTesting 077 Set<Integer> chunks = new ConcurrentSkipListSet<Integer>(); 078 private final int dataChunkSize; 079 private final int maxAlloc; 080 private final ChunkCreator chunkCreator; 081 private final CompactingMemStore.IndexType idxType; // what index is used for corresponding segment 082 083 // This flag is for closing this instance, its set when clearing snapshot of 084 // memstore 085 private volatile boolean closed = false; 086 // This flag is for reclaiming chunks. Its set when putting chunks back to 087 // pool 088 private AtomicBoolean reclaimed = new AtomicBoolean(false); 089 // Current count of open scanners which reading data from this MemStoreLAB 090 private final AtomicInteger openScannerCount = new AtomicInteger(); 091 092 // Used in testing 093 public MemStoreLABImpl() { 094 this(new Configuration()); 095 } 096 097 public MemStoreLABImpl(Configuration conf) { 098 dataChunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT); 099 maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT); 100 this.chunkCreator = ChunkCreator.getInstance(); 101 // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one! 102 Preconditions.checkArgument(maxAlloc <= dataChunkSize, 103 MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY); 104 105 // if user requested to work with MSLABs (whether on- or off-heap), then the 106 // immutable segments are going to use CellChunkMap as their index 107 idxType = CompactingMemStore.IndexType.CHUNK_MAP; 108 } 109 110 @Override 111 public Cell copyCellInto(Cell cell) { 112 // See head of copyBBECellInto for how it differs from copyCellInto 113 return (cell instanceof ByteBufferExtendedCell)? 114 copyBBECellInto((ByteBufferExtendedCell)cell, maxAlloc): 115 copyCellInto(cell, maxAlloc); 116 } 117 118 /** 119 * When a cell's size is too big (bigger than maxAlloc), 120 * copyCellInto does not allocate it on MSLAB. 121 * Since the process of flattening to CellChunkMap assumes that 122 * all cells are allocated on MSLAB, during this process, 123 * the big cells are copied into MSLAB using this method. 124 */ 125 @Override 126 public Cell forceCopyOfBigCellInto(Cell cell) { 127 int size = cell instanceof ExtendedCell? ((ExtendedCell)cell).getSerializedSize(): 128 KeyValueUtil.length(cell); 129 size += ChunkCreator.SIZEOF_CHUNK_HEADER; 130 Preconditions.checkArgument(size >= 0, "negative size"); 131 if (size <= dataChunkSize) { 132 // Using copyCellInto for cells which are bigger than the original maxAlloc 133 return copyCellInto(cell, dataChunkSize); 134 } else { 135 Chunk c = getNewExternalChunk(size); 136 int allocOffset = c.alloc(size); 137 return copyToChunkCell(cell, c.getData(), allocOffset, size); 138 } 139 } 140 141 /** 142 * Mostly a duplicate of {@link #copyCellInto(Cell, int)}} done for perf sake. It presumes 143 * ByteBufferExtendedCell instead of Cell so we deal with a specific type rather than the 144 * super generic Cell. Removes instanceof checks. Shrinkage is enough to make this inline where 145 * before it was too big. Uses less CPU. See HBASE-20875 for evidence. 146 * @see #copyCellInto(Cell, int) 147 */ 148 private Cell copyBBECellInto(ByteBufferExtendedCell cell, int maxAlloc) { 149 int size = cell.getSerializedSize(); 150 Preconditions.checkArgument(size >= 0, "negative size"); 151 // Callers should satisfy large allocations from JVM heap so limit fragmentation. 152 if (size > maxAlloc) { 153 return null; 154 } 155 Chunk c = null; 156 int allocOffset = 0; 157 while (true) { 158 // Try to get the chunk 159 c = getOrMakeChunk(); 160 // We may get null because the some other thread succeeded in getting the lock 161 // and so the current thread has to try again to make its chunk or grab the chunk 162 // that the other thread created 163 // Try to allocate from this chunk 164 if (c != null) { 165 allocOffset = c.alloc(size); 166 if (allocOffset != -1) { 167 // We succeeded - this is the common case - small alloc 168 // from a big buffer 169 break; 170 } 171 // not enough space! 172 // try to retire this chunk 173 tryRetireChunk(c); 174 } 175 } 176 return copyBBECToChunkCell(cell, c.getData(), allocOffset, size); 177 } 178 179 /** 180 * @see #copyBBECellInto(ByteBufferExtendedCell, int) 181 */ 182 private Cell copyCellInto(Cell cell, int maxAlloc) { 183 int size = cell instanceof ExtendedCell? ((ExtendedCell)cell).getSerializedSize(): 184 KeyValueUtil.length(cell); 185 Preconditions.checkArgument(size >= 0, "negative size"); 186 // Callers should satisfy large allocations directly from JVM since they 187 // don't cause fragmentation as badly. 188 if (size > maxAlloc) { 189 return null; 190 } 191 Chunk c = null; 192 int allocOffset = 0; 193 while (true) { 194 // Try to get the chunk 195 c = getOrMakeChunk(); 196 // we may get null because the some other thread succeeded in getting the lock 197 // and so the current thread has to try again to make its chunk or grab the chunk 198 // that the other thread created 199 // Try to allocate from this chunk 200 if (c != null) { 201 allocOffset = c.alloc(size); 202 if (allocOffset != -1) { 203 // We succeeded - this is the common case - small alloc 204 // from a big buffer 205 break; 206 } 207 // not enough space! 208 // try to retire this chunk 209 tryRetireChunk(c); 210 } 211 } 212 return copyToChunkCell(cell, c.getData(), allocOffset, size); 213 } 214 215 /** 216 * Clone the passed cell by copying its data into the passed buf and create a cell with a chunkid 217 * out of it 218 * @see #copyBBECToChunkCell(ByteBufferExtendedCell, ByteBuffer, int, int) 219 */ 220 private static Cell copyToChunkCell(Cell cell, ByteBuffer buf, int offset, int len) { 221 int tagsLen = cell.getTagsLength(); 222 if (cell instanceof ExtendedCell) { 223 ((ExtendedCell) cell).write(buf, offset); 224 } else { 225 // Normally all Cell impls within Server will be of type ExtendedCell. Just considering the 226 // other case also. The data fragments within Cell is copied into buf as in KeyValue 227 // serialization format only. 228 KeyValueUtil.appendTo(cell, buf, offset, true); 229 } 230 return createChunkCell(buf, offset, len, tagsLen, cell.getSequenceId()); 231 } 232 233 /** 234 * Clone the passed cell by copying its data into the passed buf and create a cell with a chunkid 235 * out of it 236 * @see #copyToChunkCell(Cell, ByteBuffer, int, int) 237 */ 238 private static Cell copyBBECToChunkCell(ByteBufferExtendedCell cell, ByteBuffer buf, int offset, 239 int len) { 240 int tagsLen = cell.getTagsLength(); 241 cell.write(buf, offset); 242 return createChunkCell(buf, offset, len, tagsLen, cell.getSequenceId()); 243 } 244 245 private static Cell createChunkCell(ByteBuffer buf, int offset, int len, int tagsLen, 246 long sequenceId) { 247 // TODO : write the seqid here. For writing seqId we should create a new cell type so 248 // that seqId is not used as the state 249 if (tagsLen == 0) { 250 // When tagsLen is 0, make a NoTagsByteBufferKeyValue version. This is an optimized class 251 // which directly return tagsLen as 0. So we avoid parsing many length components in 252 // reading the tagLength stored in the backing buffer. The Memstore addition of every Cell 253 // call getTagsLength(). 254 return new NoTagByteBufferChunkKeyValue(buf, offset, len, sequenceId); 255 } else { 256 return new ByteBufferChunkKeyValue(buf, offset, len, sequenceId); 257 } 258 } 259 260 /** 261 * Close this instance since it won't be used any more, try to put the chunks 262 * back to pool 263 */ 264 @Override 265 public void close() { 266 this.closed = true; 267 // We could put back the chunks to pool for reusing only when there is no 268 // opening scanner which will read their data 269 int count = openScannerCount.get(); 270 if(count == 0) { 271 recycleChunks(); 272 } 273 } 274 275 @VisibleForTesting 276 int getOpenScannerCount() { 277 return this.openScannerCount.get(); 278 } 279 280 /** 281 * Called when opening a scanner on the data of this MemStoreLAB 282 */ 283 @Override 284 public void incScannerCount() { 285 this.openScannerCount.incrementAndGet(); 286 } 287 288 /** 289 * Called when closing a scanner on the data of this MemStoreLAB 290 */ 291 @Override 292 public void decScannerCount() { 293 int count = this.openScannerCount.decrementAndGet(); 294 if (this.closed && count == 0) { 295 recycleChunks(); 296 } 297 } 298 299 private void recycleChunks() { 300 if (reclaimed.compareAndSet(false, true)) { 301 chunkCreator.putbackChunks(chunks); 302 } 303 } 304 305 /** 306 * Try to retire the current chunk if it is still 307 * <code>c</code>. Postcondition is that curChunk.get() 308 * != c 309 * @param c the chunk to retire 310 */ 311 private void tryRetireChunk(Chunk c) { 312 currChunk.compareAndSet(c, null); 313 // If the CAS succeeds, that means that we won the race 314 // to retire the chunk. We could use this opportunity to 315 // update metrics on external fragmentation. 316 // 317 // If the CAS fails, that means that someone else already 318 // retired the chunk for us. 319 } 320 321 /** 322 * Get the current chunk, or, if there is no current chunk, 323 * allocate a new one from the JVM. 324 */ 325 private Chunk getOrMakeChunk() { 326 // Try to get the chunk 327 Chunk c = currChunk.get(); 328 if (c != null) { 329 return c; 330 } 331 // No current chunk, so we want to allocate one. We race 332 // against other allocators to CAS in an uninitialized chunk 333 // (which is cheap to allocate) 334 if (lock.tryLock()) { 335 try { 336 // once again check inside the lock 337 c = currChunk.get(); 338 if (c != null) { 339 return c; 340 } 341 c = this.chunkCreator.getChunk(idxType); 342 if (c != null) { 343 // set the curChunk. No need of CAS as only one thread will be here 344 currChunk.set(c); 345 chunks.add(c.getId()); 346 return c; 347 } 348 } finally { 349 lock.unlock(); 350 } 351 } 352 return null; 353 } 354 355 /* Returning a new pool chunk, without replacing current chunk, 356 ** meaning MSLABImpl does not make the returned chunk as CurChunk. 357 ** The space on this chunk will be allocated externally. 358 ** The interface is only for external callers. 359 */ 360 @Override 361 public Chunk getNewExternalChunk(ChunkCreator.ChunkType chunkType) { 362 switch (chunkType) { 363 case INDEX_CHUNK: 364 case DATA_CHUNK: 365 Chunk c = this.chunkCreator.getChunk(chunkType); 366 chunks.add(c.getId()); 367 return c; 368 case JUMBO_CHUNK: // a jumbo chunk doesn't have a fixed size 369 default: 370 return null; 371 } 372 } 373 374 /* Returning a new chunk, without replacing current chunk, 375 ** meaning MSLABImpl does not make the returned chunk as CurChunk. 376 ** The space on this chunk will be allocated externally. 377 ** The interface is only for external callers. 378 ** Chunks from pools are not allocated from here, since they have fixed sizes 379 */ 380 @Override 381 public Chunk getNewExternalChunk(int size) { 382 int allocSize = size + ChunkCreator.getInstance().SIZEOF_CHUNK_HEADER; 383 if (allocSize <= ChunkCreator.getInstance().getChunkSize()) { 384 return getNewExternalChunk(ChunkCreator.ChunkType.DATA_CHUNK); 385 } else { 386 Chunk c = this.chunkCreator.getJumboChunk(size); 387 chunks.add(c.getId()); 388 return c; 389 } 390 } 391 392 @Override 393 public boolean isOnHeap() { 394 return !isOffHeap(); 395 } 396 397 @Override 398 public boolean isOffHeap() { 399 return this.chunkCreator.isOffheap(); 400 } 401 402 @VisibleForTesting 403 Chunk getCurrentChunk() { 404 return currChunk.get(); 405 } 406 407 @VisibleForTesting 408 BlockingQueue<Chunk> getPooledChunks() { 409 BlockingQueue<Chunk> pooledChunks = new LinkedBlockingQueue<>(); 410 for (Integer id : this.chunks) { 411 Chunk chunk = chunkCreator.getChunk(id); 412 if (chunk != null && chunk.isFromPool()) { 413 pooledChunks.add(chunk); 414 } 415 } 416 return pooledChunks; 417 } 418 419 @VisibleForTesting Integer getNumOfChunksReturnedToPool() { 420 int i = 0; 421 for (Integer id : this.chunks) { 422 if (chunkCreator.isChunkInPool(id)) { 423 i++; 424 } 425 } 426 return i; 427 } 428}