001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import java.nio.ByteBuffer; 022import java.util.Set; 023import java.util.concurrent.BlockingQueue; 024import java.util.concurrent.ConcurrentSkipListSet; 025import java.util.concurrent.LinkedBlockingQueue; 026import java.util.concurrent.atomic.AtomicBoolean; 027import java.util.concurrent.atomic.AtomicInteger; 028import java.util.concurrent.atomic.AtomicReference; 029import java.util.concurrent.locks.ReentrantLock; 030 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.ByteBufferExtendedCell; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.ExtendedCell; 035import org.apache.hadoop.hbase.KeyValueUtil; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 040import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 041/** 042 * A memstore-local allocation buffer. 043 * <p> 044 * The MemStoreLAB is basically a bump-the-pointer allocator that allocates 045 * big (2MB) byte[] chunks from and then doles it out to threads that request 046 * slices into the array. 047 * <p> 048 * The purpose of this class is to combat heap fragmentation in the 049 * regionserver. By ensuring that all Cells in a given memstore refer 050 * only to large chunks of contiguous memory, we ensure that large blocks 051 * get freed up when the memstore is flushed. 052 * <p> 053 * Without the MSLAB, the byte array allocated during insertion end up 054 * interleaved throughout the heap, and the old generation gets progressively 055 * more fragmented until a stop-the-world compacting collection occurs. 056 * <p> 057 * TODO: we should probably benchmark whether word-aligning the allocations 058 * would provide a performance improvement - probably would speed up the 059 * Bytes.toLong/Bytes.toInt calls in KeyValue, but some of those are cached 060 * anyway. 061 * The chunks created by this MemStoreLAB can get pooled at {@link ChunkCreator}. 062 * When the Chunk comes from pool, it can be either an on heap or an off heap backed chunk. The chunks, 063 * which this MemStoreLAB creates on its own (when no chunk available from pool), those will be 064 * always on heap backed. 065 */ 066@InterfaceAudience.Private 067public class MemStoreLABImpl implements MemStoreLAB { 068 069 static final Logger LOG = LoggerFactory.getLogger(MemStoreLABImpl.class); 070 071 private AtomicReference<Chunk> currChunk = new AtomicReference<>(); 072 // Lock to manage multiple handlers requesting for a chunk 073 private ReentrantLock lock = new ReentrantLock(); 074 075 // A set of chunks contained by this memstore LAB 076 @VisibleForTesting 077 Set<Integer> chunks = new ConcurrentSkipListSet<Integer>(); 078 private final int dataChunkSize; 079 private final int maxAlloc; 080 private final ChunkCreator chunkCreator; 081 private final CompactingMemStore.IndexType idxType; // what index is used for corresponding segment 082 083 // This flag is for closing this instance, its set when clearing snapshot of 084 // memstore 085 private volatile boolean closed = false; 086 // This flag is for reclaiming chunks. Its set when putting chunks back to 087 // pool 088 private AtomicBoolean reclaimed = new AtomicBoolean(false); 089 // Current count of open scanners which reading data from this MemStoreLAB 090 private final AtomicInteger openScannerCount = new AtomicInteger(); 091 092 // Used in testing 093 public MemStoreLABImpl() { 094 this(new Configuration()); 095 } 096 097 public MemStoreLABImpl(Configuration conf) { 098 dataChunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT); 099 maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT); 100 this.chunkCreator = ChunkCreator.getInstance(); 101 // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one! 102 Preconditions.checkArgument(maxAlloc <= dataChunkSize, 103 MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY); 104 105 // if user requested to work with MSLABs (whether on- or off-heap), then the 106 // immutable segments are going to use CellChunkMap as their index 107 idxType = CompactingMemStore.IndexType.CHUNK_MAP; 108 } 109 110 @Override 111 public Cell copyCellInto(Cell cell) { 112 // See head of copyBBECellInto for how it differs from copyCellInto 113 return (cell instanceof ByteBufferExtendedCell)? 114 copyBBECellInto((ByteBufferExtendedCell)cell, maxAlloc): 115 copyCellInto(cell, maxAlloc); 116 } 117 118 /** 119 * When a cell's size is too big (bigger than maxAlloc), 120 * copyCellInto does not allocate it on MSLAB. 121 * Since the process of flattening to CellChunkMap assumes that 122 * all cells are allocated on MSLAB, during this process, 123 * the big cells are copied into MSLAB using this method. 124 */ 125 @Override 126 public Cell forceCopyOfBigCellInto(Cell cell) { 127 int size = cell instanceof ExtendedCell? ((ExtendedCell)cell).getSerializedSize(): 128 KeyValueUtil.length(cell); 129 size += ChunkCreator.SIZEOF_CHUNK_HEADER; 130 Preconditions.checkArgument(size >= 0, "negative size"); 131 if (size <= dataChunkSize) { 132 // Using copyCellInto for cells which are bigger than the original maxAlloc 133 return copyCellInto(cell, dataChunkSize); 134 } else { 135 Chunk c = getNewExternalChunk(size); 136 int allocOffset = c.alloc(size); 137 return copyToChunkCell(cell, c.getData(), allocOffset, size); 138 } 139 } 140 141 /** 142 * Mostly a duplicate of {@link #copyCellInto(Cell, int)}} done for perf sake. It presumes 143 * ByteBufferExtendedCell instead of Cell so we deal with a specific type rather than the 144 * super generic Cell. Removes instanceof checks. Shrinkage is enough to make this inline where 145 * before it was too big. Uses less CPU. See HBASE-20875 for evidence. 146 * @see #copyCellInto(Cell, int) 147 */ 148 private Cell copyBBECellInto(ByteBufferExtendedCell cell, int maxAlloc) { 149 int size = cell.getSerializedSize(); 150 Preconditions.checkArgument(size >= 0, "negative size"); 151 // Callers should satisfy large allocations from JVM heap so limit fragmentation. 152 if (size > maxAlloc) { 153 return null; 154 } 155 Chunk c = null; 156 int allocOffset = 0; 157 while (true) { 158 // Try to get the chunk 159 c = getOrMakeChunk(); 160 // We may get null because the some other thread succeeded in getting the lock 161 // and so the current thread has to try again to make its chunk or grab the chunk 162 // that the other thread created 163 // Try to allocate from this chunk 164 if (c != null) { 165 allocOffset = c.alloc(size); 166 if (allocOffset != -1) { 167 // We succeeded - this is the common case - small alloc 168 // from a big buffer 169 break; 170 } 171 // not enough space! 172 // try to retire this chunk 173 tryRetireChunk(c); 174 } 175 } 176 return copyBBECToChunkCell(cell, c.getData(), allocOffset, size); 177 } 178 179 /** 180 * @see #copyBBECellInto(ByteBufferExtendedCell, int) 181 */ 182 private Cell copyCellInto(Cell cell, int maxAlloc) { 183 int size = cell instanceof ExtendedCell? ((ExtendedCell)cell).getSerializedSize(): 184 KeyValueUtil.length(cell); 185 Preconditions.checkArgument(size >= 0, "negative size"); 186 // Callers should satisfy large allocations directly from JVM since they 187 // don't cause fragmentation as badly. 188 if (size > maxAlloc) { 189 return null; 190 } 191 Chunk c = null; 192 int allocOffset = 0; 193 while (true) { 194 // Try to get the chunk 195 c = getOrMakeChunk(); 196 // we may get null because the some other thread succeeded in getting the lock 197 // and so the current thread has to try again to make its chunk or grab the chunk 198 // that the other thread created 199 // Try to allocate from this chunk 200 if (c != null) { 201 allocOffset = c.alloc(size); 202 if (allocOffset != -1) { 203 // We succeeded - this is the common case - small alloc 204 // from a big buffer 205 break; 206 } 207 // not enough space! 208 // try to retire this chunk 209 tryRetireChunk(c); 210 } 211 } 212 return copyToChunkCell(cell, c.getData(), allocOffset, size); 213 } 214 215 /** 216 * Clone the passed cell by copying its data into the passed buf and create a cell with a chunkid 217 * out of it 218 * @see #copyBBECToChunkCell(ByteBufferExtendedCell, ByteBuffer, int, int) 219 */ 220 private static Cell copyToChunkCell(Cell cell, ByteBuffer buf, int offset, int len) { 221 int tagsLen = cell.getTagsLength(); 222 if (cell instanceof ExtendedCell) { 223 ((ExtendedCell) cell).write(buf, offset); 224 } else { 225 // Normally all Cell impls within Server will be of type ExtendedCell. Just considering the 226 // other case also. The data fragments within Cell is copied into buf as in KeyValue 227 // serialization format only. 228 KeyValueUtil.appendTo(cell, buf, offset, true); 229 } 230 return createChunkCell(buf, offset, len, tagsLen, cell.getSequenceId()); 231 } 232 233 /** 234 * Clone the passed cell by copying its data into the passed buf and create a cell with a chunkid 235 * out of it 236 * @see #copyToChunkCell(Cell, ByteBuffer, int, int) 237 */ 238 private static Cell copyBBECToChunkCell(ByteBufferExtendedCell cell, ByteBuffer buf, int offset, 239 int len) { 240 int tagsLen = cell.getTagsLength(); 241 cell.write(buf, offset); 242 return createChunkCell(buf, offset, len, tagsLen, cell.getSequenceId()); 243 } 244 245 private static Cell createChunkCell(ByteBuffer buf, int offset, int len, int tagsLen, 246 long sequenceId) { 247 // TODO : write the seqid here. For writing seqId we should create a new cell type so 248 // that seqId is not used as the state 249 if (tagsLen == 0) { 250 // When tagsLen is 0, make a NoTagsByteBufferKeyValue version. This is an optimized class 251 // which directly return tagsLen as 0. So we avoid parsing many length components in 252 // reading the tagLength stored in the backing buffer. The Memstore addition of every Cell 253 // call getTagsLength(). 254 return new NoTagByteBufferChunkKeyValue(buf, offset, len, sequenceId); 255 } else { 256 return new ByteBufferChunkKeyValue(buf, offset, len, sequenceId); 257 } 258 } 259 260 /** 261 * Close this instance since it won't be used any more, try to put the chunks 262 * back to pool 263 */ 264 @Override 265 public void close() { 266 this.closed = true; 267 // We could put back the chunks to pool for reusing only when there is no 268 // opening scanner which will read their data 269 int count = openScannerCount.get(); 270 if(count == 0) { 271 recycleChunks(); 272 } 273 } 274 275 /** 276 * Called when opening a scanner on the data of this MemStoreLAB 277 */ 278 @Override 279 public void incScannerCount() { 280 this.openScannerCount.incrementAndGet(); 281 } 282 283 /** 284 * Called when closing a scanner on the data of this MemStoreLAB 285 */ 286 @Override 287 public void decScannerCount() { 288 int count = this.openScannerCount.decrementAndGet(); 289 if (this.closed && count == 0) { 290 recycleChunks(); 291 } 292 } 293 294 private void recycleChunks() { 295 if (reclaimed.compareAndSet(false, true)) { 296 chunkCreator.putbackChunks(chunks); 297 } 298 } 299 300 /** 301 * Try to retire the current chunk if it is still 302 * <code>c</code>. Postcondition is that curChunk.get() 303 * != c 304 * @param c the chunk to retire 305 */ 306 private void tryRetireChunk(Chunk c) { 307 currChunk.compareAndSet(c, null); 308 // If the CAS succeeds, that means that we won the race 309 // to retire the chunk. We could use this opportunity to 310 // update metrics on external fragmentation. 311 // 312 // If the CAS fails, that means that someone else already 313 // retired the chunk for us. 314 } 315 316 /** 317 * Get the current chunk, or, if there is no current chunk, 318 * allocate a new one from the JVM. 319 */ 320 private Chunk getOrMakeChunk() { 321 // Try to get the chunk 322 Chunk c = currChunk.get(); 323 if (c != null) { 324 return c; 325 } 326 // No current chunk, so we want to allocate one. We race 327 // against other allocators to CAS in an uninitialized chunk 328 // (which is cheap to allocate) 329 if (lock.tryLock()) { 330 try { 331 // once again check inside the lock 332 c = currChunk.get(); 333 if (c != null) { 334 return c; 335 } 336 c = this.chunkCreator.getChunk(idxType); 337 if (c != null) { 338 // set the curChunk. No need of CAS as only one thread will be here 339 currChunk.set(c); 340 chunks.add(c.getId()); 341 return c; 342 } 343 } finally { 344 lock.unlock(); 345 } 346 } 347 return null; 348 } 349 350 /* Returning a new pool chunk, without replacing current chunk, 351 ** meaning MSLABImpl does not make the returned chunk as CurChunk. 352 ** The space on this chunk will be allocated externally. 353 ** The interface is only for external callers. 354 */ 355 @Override 356 public Chunk getNewExternalChunk(ChunkCreator.ChunkType chunkType) { 357 switch (chunkType) { 358 case INDEX_CHUNK: 359 case DATA_CHUNK: 360 Chunk c = this.chunkCreator.getChunk(chunkType); 361 chunks.add(c.getId()); 362 return c; 363 case JUMBO_CHUNK: // a jumbo chunk doesn't have a fixed size 364 default: 365 return null; 366 } 367 } 368 369 /* Returning a new chunk, without replacing current chunk, 370 ** meaning MSLABImpl does not make the returned chunk as CurChunk. 371 ** The space on this chunk will be allocated externally. 372 ** The interface is only for external callers. 373 ** Chunks from pools are not allocated from here, since they have fixed sizes 374 */ 375 @Override 376 public Chunk getNewExternalChunk(int size) { 377 int allocSize = size + ChunkCreator.getInstance().SIZEOF_CHUNK_HEADER; 378 if (allocSize <= ChunkCreator.getInstance().getChunkSize()) { 379 return getNewExternalChunk(ChunkCreator.ChunkType.DATA_CHUNK); 380 } else { 381 Chunk c = this.chunkCreator.getJumboChunk(size); 382 chunks.add(c.getId()); 383 return c; 384 } 385 } 386 387 @Override 388 public boolean isOnHeap() { 389 return !isOffHeap(); 390 } 391 392 @Override 393 public boolean isOffHeap() { 394 return this.chunkCreator.isOffheap(); 395 } 396 397 @VisibleForTesting 398 Chunk getCurrentChunk() { 399 return currChunk.get(); 400 } 401 402 @VisibleForTesting 403 BlockingQueue<Chunk> getPooledChunks() { 404 BlockingQueue<Chunk> pooledChunks = new LinkedBlockingQueue<>(); 405 for (Integer id : this.chunks) { 406 Chunk chunk = chunkCreator.getChunk(id); 407 if (chunk != null && chunk.isFromPool()) { 408 pooledChunks.add(chunk); 409 } 410 } 411 return pooledChunks; 412 } 413 414 @VisibleForTesting Integer getNumOfChunksReturnedToPool() { 415 int i = 0; 416 for (Integer id : this.chunks) { 417 if (chunkCreator.isChunkInPool(id)) { 418 i++; 419 } 420 } 421 return i; 422 } 423}