001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import java.nio.ByteBuffer; 022import java.util.Set; 023import java.util.concurrent.BlockingQueue; 024import java.util.concurrent.ConcurrentSkipListSet; 025import java.util.concurrent.LinkedBlockingQueue; 026import java.util.concurrent.atomic.AtomicBoolean; 027import java.util.concurrent.atomic.AtomicInteger; 028import java.util.concurrent.atomic.AtomicReference; 029import java.util.concurrent.locks.ReentrantLock; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.ByteBufferExtendedCell; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.ExtendedCell; 034import org.apache.hadoop.hbase.KeyValueUtil; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 040 041/** 042 * A memstore-local allocation buffer. 043 * <p> 044 * The MemStoreLAB is basically a bump-the-pointer allocator that allocates 045 * big (2MB) byte[] chunks from and then doles it out to threads that request 046 * slices into the array. 047 * <p> 048 * The purpose of this class is to combat heap fragmentation in the 049 * regionserver. By ensuring that all Cells in a given memstore refer 050 * only to large chunks of contiguous memory, we ensure that large blocks 051 * get freed up when the memstore is flushed. 052 * <p> 053 * Without the MSLAB, the byte array allocated during insertion end up 054 * interleaved throughout the heap, and the old generation gets progressively 055 * more fragmented until a stop-the-world compacting collection occurs. 056 * <p> 057 * TODO: we should probably benchmark whether word-aligning the allocations 058 * would provide a performance improvement - probably would speed up the 059 * Bytes.toLong/Bytes.toInt calls in KeyValue, but some of those are cached 060 * anyway. 061 * The chunks created by this MemStoreLAB can get pooled at {@link ChunkCreator}. 062 * When the Chunk comes from pool, it can be either an on heap or an off heap backed chunk. The chunks, 063 * which this MemStoreLAB creates on its own (when no chunk available from pool), those will be 064 * always on heap backed. 065 */ 066@InterfaceAudience.Private 067public class MemStoreLABImpl implements MemStoreLAB { 068 069 static final Logger LOG = LoggerFactory.getLogger(MemStoreLABImpl.class); 070 071 private AtomicReference<Chunk> currChunk = new AtomicReference<>(); 072 // Lock to manage multiple handlers requesting for a chunk 073 private ReentrantLock lock = new ReentrantLock(); 074 075 // A set of chunks contained by this memstore LAB 076 Set<Integer> chunks = new ConcurrentSkipListSet<Integer>(); 077 private final int dataChunkSize; 078 private final int maxAlloc; 079 private final ChunkCreator chunkCreator; 080 private final CompactingMemStore.IndexType idxType; // what index is used for corresponding segment 081 082 // This flag is for closing this instance, its set when clearing snapshot of 083 // memstore 084 private volatile boolean closed = false; 085 // This flag is for reclaiming chunks. Its set when putting chunks back to 086 // pool 087 private AtomicBoolean reclaimed = new AtomicBoolean(false); 088 // Current count of open scanners which reading data from this MemStoreLAB 089 private final AtomicInteger openScannerCount = new AtomicInteger(); 090 091 // Used in testing 092 public MemStoreLABImpl() { 093 this(new Configuration()); 094 } 095 096 public MemStoreLABImpl(Configuration conf) { 097 dataChunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT); 098 maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT); 099 this.chunkCreator = ChunkCreator.getInstance(); 100 // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one! 101 Preconditions.checkArgument(maxAlloc <= dataChunkSize, 102 MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY); 103 104 // if user requested to work with MSLABs (whether on- or off-heap), then the 105 // immutable segments are going to use CellChunkMap as their index 106 idxType = CompactingMemStore.IndexType.CHUNK_MAP; 107 } 108 109 @Override 110 public Cell copyCellInto(Cell cell) { 111 // See head of copyBBECellInto for how it differs from copyCellInto 112 return (cell instanceof ByteBufferExtendedCell)? 113 copyBBECellInto((ByteBufferExtendedCell)cell, maxAlloc): 114 copyCellInto(cell, maxAlloc); 115 } 116 117 /** 118 * When a cell's size is too big (bigger than maxAlloc), 119 * copyCellInto does not allocate it on MSLAB. 120 * Since the process of flattening to CellChunkMap assumes that 121 * all cells are allocated on MSLAB, during this process, 122 * the big cells are copied into MSLAB using this method. 123 */ 124 @Override 125 public Cell forceCopyOfBigCellInto(Cell cell) { 126 int size = Segment.getCellLength(cell); 127 size += ChunkCreator.SIZEOF_CHUNK_HEADER; 128 Preconditions.checkArgument(size >= 0, "negative size"); 129 if (size <= dataChunkSize) { 130 // Using copyCellInto for cells which are bigger than the original maxAlloc 131 return copyCellInto(cell, dataChunkSize); 132 } else { 133 Chunk c = getNewExternalChunk(size); 134 int allocOffset = c.alloc(size); 135 return copyToChunkCell(cell, c.getData(), allocOffset, size); 136 } 137 } 138 139 /** 140 * Mostly a duplicate of {@link #copyCellInto(Cell, int)}} done for perf sake. It presumes 141 * ByteBufferExtendedCell instead of Cell so we deal with a specific type rather than the 142 * super generic Cell. Removes instanceof checks. Shrinkage is enough to make this inline where 143 * before it was too big. Uses less CPU. See HBASE-20875 for evidence. 144 * @see #copyCellInto(Cell, int) 145 */ 146 private Cell copyBBECellInto(ByteBufferExtendedCell cell, int maxAlloc) { 147 int size = cell.getSerializedSize(); 148 Preconditions.checkArgument(size >= 0, "negative size"); 149 // Callers should satisfy large allocations from JVM heap so limit fragmentation. 150 if (size > maxAlloc) { 151 return null; 152 } 153 Chunk c = null; 154 int allocOffset = 0; 155 while (true) { 156 // Try to get the chunk 157 c = getOrMakeChunk(); 158 // We may get null because the some other thread succeeded in getting the lock 159 // and so the current thread has to try again to make its chunk or grab the chunk 160 // that the other thread created 161 // Try to allocate from this chunk 162 if (c != null) { 163 allocOffset = c.alloc(size); 164 if (allocOffset != -1) { 165 // We succeeded - this is the common case - small alloc 166 // from a big buffer 167 break; 168 } 169 // not enough space! 170 // try to retire this chunk 171 tryRetireChunk(c); 172 } 173 } 174 return copyBBECToChunkCell(cell, c.getData(), allocOffset, size); 175 } 176 177 /** 178 * @see #copyBBECellInto(ByteBufferExtendedCell, int) 179 */ 180 private Cell copyCellInto(Cell cell, int maxAlloc) { 181 int size = Segment.getCellLength(cell); 182 Preconditions.checkArgument(size >= 0, "negative size"); 183 // Callers should satisfy large allocations directly from JVM since they 184 // don't cause fragmentation as badly. 185 if (size > maxAlloc) { 186 return null; 187 } 188 Chunk c = null; 189 int allocOffset = 0; 190 while (true) { 191 // Try to get the chunk 192 c = getOrMakeChunk(); 193 // we may get null because the some other thread succeeded in getting the lock 194 // and so the current thread has to try again to make its chunk or grab the chunk 195 // that the other thread created 196 // Try to allocate from this chunk 197 if (c != null) { 198 allocOffset = c.alloc(size); 199 if (allocOffset != -1) { 200 // We succeeded - this is the common case - small alloc 201 // from a big buffer 202 break; 203 } 204 // not enough space! 205 // try to retire this chunk 206 tryRetireChunk(c); 207 } 208 } 209 return copyToChunkCell(cell, c.getData(), allocOffset, size); 210 } 211 212 /** 213 * Clone the passed cell by copying its data into the passed buf and create a cell with a chunkid 214 * out of it 215 * @see #copyBBECToChunkCell(ByteBufferExtendedCell, ByteBuffer, int, int) 216 */ 217 private static Cell copyToChunkCell(Cell cell, ByteBuffer buf, int offset, int len) { 218 int tagsLen = cell.getTagsLength(); 219 if (cell instanceof ExtendedCell) { 220 ((ExtendedCell) cell).write(buf, offset); 221 } else { 222 // Normally all Cell impls within Server will be of type ExtendedCell. Just considering the 223 // other case also. The data fragments within Cell is copied into buf as in KeyValue 224 // serialization format only. 225 KeyValueUtil.appendTo(cell, buf, offset, true); 226 } 227 return createChunkCell(buf, offset, len, tagsLen, cell.getSequenceId()); 228 } 229 230 /** 231 * Clone the passed cell by copying its data into the passed buf and create a cell with a chunkid 232 * out of it 233 * @see #copyToChunkCell(Cell, ByteBuffer, int, int) 234 */ 235 private static Cell copyBBECToChunkCell(ByteBufferExtendedCell cell, ByteBuffer buf, int offset, 236 int len) { 237 int tagsLen = cell.getTagsLength(); 238 cell.write(buf, offset); 239 return createChunkCell(buf, offset, len, tagsLen, cell.getSequenceId()); 240 } 241 242 private static Cell createChunkCell(ByteBuffer buf, int offset, int len, int tagsLen, 243 long sequenceId) { 244 // TODO : write the seqid here. For writing seqId we should create a new cell type so 245 // that seqId is not used as the state 246 if (tagsLen == 0) { 247 // When tagsLen is 0, make a NoTagsByteBufferKeyValue version. This is an optimized class 248 // which directly return tagsLen as 0. So we avoid parsing many length components in 249 // reading the tagLength stored in the backing buffer. The Memstore addition of every Cell 250 // call getTagsLength(). 251 return new NoTagByteBufferChunkKeyValue(buf, offset, len, sequenceId); 252 } else { 253 return new ByteBufferChunkKeyValue(buf, offset, len, sequenceId); 254 } 255 } 256 257 /** 258 * Close this instance since it won't be used any more, try to put the chunks 259 * back to pool 260 */ 261 @Override 262 public void close() { 263 this.closed = true; 264 // We could put back the chunks to pool for reusing only when there is no 265 // opening scanner which will read their data 266 int count = openScannerCount.get(); 267 if(count == 0) { 268 recycleChunks(); 269 } 270 } 271 272 int getOpenScannerCount() { 273 return this.openScannerCount.get(); 274 } 275 276 /** 277 * Called when opening a scanner on the data of this MemStoreLAB 278 */ 279 @Override 280 public void incScannerCount() { 281 this.openScannerCount.incrementAndGet(); 282 } 283 284 /** 285 * Called when closing a scanner on the data of this MemStoreLAB 286 */ 287 @Override 288 public void decScannerCount() { 289 int count = this.openScannerCount.decrementAndGet(); 290 if (this.closed && count == 0) { 291 recycleChunks(); 292 } 293 } 294 295 private void recycleChunks() { 296 if (reclaimed.compareAndSet(false, true)) { 297 chunkCreator.putbackChunks(chunks); 298 } 299 } 300 301 /** 302 * Try to retire the current chunk if it is still 303 * <code>c</code>. Postcondition is that curChunk.get() 304 * != c 305 * @param c the chunk to retire 306 */ 307 private void tryRetireChunk(Chunk c) { 308 currChunk.compareAndSet(c, null); 309 // If the CAS succeeds, that means that we won the race 310 // to retire the chunk. We could use this opportunity to 311 // update metrics on external fragmentation. 312 // 313 // If the CAS fails, that means that someone else already 314 // retired the chunk for us. 315 } 316 317 /** 318 * Get the current chunk, or, if there is no current chunk, 319 * allocate a new one from the JVM. 320 */ 321 private Chunk getOrMakeChunk() { 322 // Try to get the chunk 323 Chunk c = currChunk.get(); 324 if (c != null) { 325 return c; 326 } 327 // No current chunk, so we want to allocate one. We race 328 // against other allocators to CAS in an uninitialized chunk 329 // (which is cheap to allocate) 330 if (lock.tryLock()) { 331 try { 332 // once again check inside the lock 333 c = currChunk.get(); 334 if (c != null) { 335 return c; 336 } 337 c = this.chunkCreator.getChunk(idxType); 338 if (c != null) { 339 // set the curChunk. No need of CAS as only one thread will be here 340 currChunk.set(c); 341 chunks.add(c.getId()); 342 return c; 343 } 344 } finally { 345 lock.unlock(); 346 } 347 } 348 return null; 349 } 350 351 /* Returning a new pool chunk, without replacing current chunk, 352 ** meaning MSLABImpl does not make the returned chunk as CurChunk. 353 ** The space on this chunk will be allocated externally. 354 ** The interface is only for external callers. 355 */ 356 @Override 357 public Chunk getNewExternalChunk(ChunkCreator.ChunkType chunkType) { 358 switch (chunkType) { 359 case INDEX_CHUNK: 360 case DATA_CHUNK: 361 Chunk c = this.chunkCreator.getChunk(chunkType); 362 chunks.add(c.getId()); 363 return c; 364 case JUMBO_CHUNK: // a jumbo chunk doesn't have a fixed size 365 default: 366 return null; 367 } 368 } 369 370 /* Returning a new chunk, without replacing current chunk, 371 ** meaning MSLABImpl does not make the returned chunk as CurChunk. 372 ** The space on this chunk will be allocated externally. 373 ** The interface is only for external callers. 374 ** Chunks from pools are not allocated from here, since they have fixed sizes 375 */ 376 @Override 377 public Chunk getNewExternalChunk(int size) { 378 int allocSize = size + ChunkCreator.SIZEOF_CHUNK_HEADER; 379 if (allocSize <= ChunkCreator.getInstance().getChunkSize()) { 380 return getNewExternalChunk(ChunkCreator.ChunkType.DATA_CHUNK); 381 } else { 382 Chunk c = this.chunkCreator.getJumboChunk(size); 383 chunks.add(c.getId()); 384 return c; 385 } 386 } 387 388 @Override 389 public boolean isOnHeap() { 390 return !isOffHeap(); 391 } 392 393 @Override 394 public boolean isOffHeap() { 395 return this.chunkCreator.isOffheap(); 396 } 397 398 Chunk getCurrentChunk() { 399 return currChunk.get(); 400 } 401 402 BlockingQueue<Chunk> getPooledChunks() { 403 BlockingQueue<Chunk> pooledChunks = new LinkedBlockingQueue<>(); 404 for (Integer id : this.chunks) { 405 Chunk chunk = chunkCreator.getChunk(id); 406 if (chunk != null && chunk.isFromPool()) { 407 pooledChunks.add(chunk); 408 } 409 } 410 return pooledChunks; 411 } 412 413 Integer getNumOfChunksReturnedToPool() { 414 int i = 0; 415 for (Integer id : this.chunks) { 416 if (chunkCreator.isChunkInPool(id)) { 417 i++; 418 } 419 } 420 return i; 421 } 422}