001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.nio.ByteBuffer; 022import java.util.Set; 023import java.util.concurrent.BlockingQueue; 024import java.util.concurrent.ConcurrentSkipListSet; 025import java.util.concurrent.LinkedBlockingQueue; 026import java.util.concurrent.atomic.AtomicBoolean; 027import java.util.concurrent.atomic.AtomicReference; 028import java.util.concurrent.locks.ReentrantLock; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.ByteBufferExtendedCell; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.ExtendedCell; 033import org.apache.hadoop.hbase.KeyValueUtil; 034import org.apache.hadoop.hbase.nio.RefCnt; 035import org.apache.hadoop.hbase.regionserver.CompactingMemStore.IndexType; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 041 042/** 043 * A memstore-local allocation buffer. 044 * <p> 045 * The MemStoreLAB is basically a bump-the-pointer allocator that allocates big (2MB) byte[] chunks 046 * from and then doles it out to threads that request slices into the array. 047 * <p> 048 * The purpose of this class is to combat heap fragmentation in the regionserver. By ensuring that 049 * all Cells in a given memstore refer only to large chunks of contiguous memory, we ensure that 050 * large blocks get freed up when the memstore is flushed. 051 * <p> 052 * Without the MSLAB, the byte array allocated during insertion end up interleaved throughout the 053 * heap, and the old generation gets progressively more fragmented until a stop-the-world compacting 054 * collection occurs. 055 * <p> 056 * TODO: we should probably benchmark whether word-aligning the allocations would provide a 057 * performance improvement - probably would speed up the Bytes.toLong/Bytes.toInt calls in KeyValue, 058 * but some of those are cached anyway. The chunks created by this MemStoreLAB can get pooled at 059 * {@link ChunkCreator}. When the Chunk comes from pool, it can be either an on heap or an off heap 060 * backed chunk. The chunks, which this MemStoreLAB creates on its own (when no chunk available from 061 * pool), those will be always on heap backed. 062 * <p> 063 * NOTE:if user requested to work with MSLABs (whether on- or off-heap), in 064 * {@link CompactingMemStore} ctor, the {@link CompactingMemStore#indexType} could only be 065 * {@link IndexType#CHUNK_MAP},that is to say the immutable segments using MSLABs are going to use 066 * {@link CellChunkMap} as their index. 067 */ 068@InterfaceAudience.Private 069public class MemStoreLABImpl implements MemStoreLAB { 070 071 static final Logger LOG = LoggerFactory.getLogger(MemStoreLABImpl.class); 072 073 private AtomicReference<Chunk> currChunk = new AtomicReference<>(); 074 // Lock to manage multiple handlers requesting for a chunk 075 private ReentrantLock lock = new ReentrantLock(); 076 077 // A set of chunks contained by this memstore LAB 078 Set<Integer> chunks = new ConcurrentSkipListSet<Integer>(); 079 private final int dataChunkSize; 080 private final int maxAlloc; 081 private final ChunkCreator chunkCreator; 082 083 // This flag is for closing this instance, its set when clearing snapshot of 084 // memstore 085 private final AtomicBoolean closed = new AtomicBoolean(false);; 086 // This flag is for reclaiming chunks. Its set when putting chunks back to 087 // pool 088 private final AtomicBoolean reclaimed = new AtomicBoolean(false); 089 /** 090 * Its initial value is 1, so it is one bigger than the current count of open scanners which 091 * reading data from this MemStoreLAB. 092 */ 093 private final RefCnt refCnt; 094 095 // Used in testing 096 public MemStoreLABImpl() { 097 this(new Configuration()); 098 } 099 100 public MemStoreLABImpl(Configuration conf) { 101 dataChunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT); 102 maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT); 103 this.chunkCreator = ChunkCreator.getInstance(); 104 // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one! 105 Preconditions.checkArgument(maxAlloc <= dataChunkSize, 106 MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY); 107 108 this.refCnt = RefCnt.create(() -> { 109 recycleChunks(); 110 }); 111 112 } 113 114 @Override 115 public Cell copyCellInto(Cell cell) { 116 // See head of copyBBECellInto for how it differs from copyCellInto 117 return (cell instanceof ByteBufferExtendedCell) 118 ? copyBBECellInto((ByteBufferExtendedCell) cell, maxAlloc) 119 : copyCellInto(cell, maxAlloc); 120 } 121 122 /** 123 * When a cell's size is too big (bigger than maxAlloc), copyCellInto does not allocate it on 124 * MSLAB. Since the process of flattening to CellChunkMap assumes that all cells are allocated on 125 * MSLAB, during this process, the big cells are copied into MSLAB using this method. 126 */ 127 @Override 128 public Cell forceCopyOfBigCellInto(Cell cell) { 129 int size = Segment.getCellLength(cell); 130 Preconditions.checkArgument(size >= 0, "negative size"); 131 if (size + ChunkCreator.SIZEOF_CHUNK_HEADER <= dataChunkSize) { 132 // Using copyCellInto for cells which are bigger than the original maxAlloc 133 return copyCellInto(cell, dataChunkSize); 134 } else { 135 Chunk c = getNewExternalChunk(size); 136 int allocOffset = c.alloc(size); 137 return copyToChunkCell(cell, c.getData(), allocOffset, size); 138 } 139 } 140 141 /** 142 * Mostly a duplicate of {@link #copyCellInto(Cell, int)}} done for perf sake. It presumes 143 * ByteBufferExtendedCell instead of Cell so we deal with a specific type rather than the super 144 * generic Cell. Removes instanceof checks. Shrinkage is enough to make this inline where before 145 * it was too big. Uses less CPU. See HBASE-20875 for evidence. 146 * @see #copyCellInto(Cell, int) 147 */ 148 private Cell copyBBECellInto(ByteBufferExtendedCell cell, int maxAlloc) { 149 int size = cell.getSerializedSize(); 150 Preconditions.checkArgument(size >= 0, "negative size"); 151 // Callers should satisfy large allocations from JVM heap so limit fragmentation. 152 if (size > maxAlloc) { 153 return null; 154 } 155 Chunk c = null; 156 int allocOffset = 0; 157 while (true) { 158 // Try to get the chunk 159 c = getOrMakeChunk(); 160 // We may get null because the some other thread succeeded in getting the lock 161 // and so the current thread has to try again to make its chunk or grab the chunk 162 // that the other thread created 163 // Try to allocate from this chunk 164 if (c != null) { 165 allocOffset = c.alloc(size); 166 if (allocOffset != -1) { 167 // We succeeded - this is the common case - small alloc 168 // from a big buffer 169 break; 170 } 171 // not enough space! 172 // try to retire this chunk 173 tryRetireChunk(c); 174 } 175 } 176 return copyBBECToChunkCell(cell, c.getData(), allocOffset, size); 177 } 178 179 /** 180 * @see #copyBBECellInto(ByteBufferExtendedCell, int) 181 */ 182 private Cell copyCellInto(Cell cell, int maxAlloc) { 183 int size = Segment.getCellLength(cell); 184 Preconditions.checkArgument(size >= 0, "negative size"); 185 // Callers should satisfy large allocations directly from JVM since they 186 // don't cause fragmentation as badly. 187 if (size > maxAlloc) { 188 return null; 189 } 190 Chunk c = null; 191 int allocOffset = 0; 192 while (true) { 193 // Try to get the chunk 194 c = getOrMakeChunk(); 195 // we may get null because the some other thread succeeded in getting the lock 196 // and so the current thread has to try again to make its chunk or grab the chunk 197 // that the other thread created 198 // Try to allocate from this chunk 199 if (c != null) { 200 allocOffset = c.alloc(size); 201 if (allocOffset != -1) { 202 // We succeeded - this is the common case - small alloc 203 // from a big buffer 204 break; 205 } 206 // not enough space! 207 // try to retire this chunk 208 tryRetireChunk(c); 209 } 210 } 211 return copyToChunkCell(cell, c.getData(), allocOffset, size); 212 } 213 214 /** 215 * Clone the passed cell by copying its data into the passed buf and create a cell with a chunkid 216 * out of it 217 * @see #copyBBECToChunkCell(ByteBufferExtendedCell, ByteBuffer, int, int) 218 */ 219 private static Cell copyToChunkCell(Cell cell, ByteBuffer buf, int offset, int len) { 220 int tagsLen = cell.getTagsLength(); 221 if (cell instanceof ExtendedCell) { 222 ((ExtendedCell) cell).write(buf, offset); 223 } else { 224 // Normally all Cell impls within Server will be of type ExtendedCell. Just considering the 225 // other case also. The data fragments within Cell is copied into buf as in KeyValue 226 // serialization format only. 227 KeyValueUtil.appendTo(cell, buf, offset, true); 228 } 229 return createChunkCell(buf, offset, len, tagsLen, cell.getSequenceId()); 230 } 231 232 /** 233 * Clone the passed cell by copying its data into the passed buf and create a cell with a chunkid 234 * out of it 235 * @see #copyToChunkCell(Cell, ByteBuffer, int, int) 236 */ 237 private static Cell copyBBECToChunkCell(ByteBufferExtendedCell cell, ByteBuffer buf, int offset, 238 int len) { 239 int tagsLen = cell.getTagsLength(); 240 cell.write(buf, offset); 241 return createChunkCell(buf, offset, len, tagsLen, cell.getSequenceId()); 242 } 243 244 private static Cell createChunkCell(ByteBuffer buf, int offset, int len, int tagsLen, 245 long sequenceId) { 246 // TODO : write the seqid here. For writing seqId we should create a new cell type so 247 // that seqId is not used as the state 248 if (tagsLen == 0) { 249 // When tagsLen is 0, make a NoTagsByteBufferKeyValue version. This is an optimized class 250 // which directly return tagsLen as 0. So we avoid parsing many length components in 251 // reading the tagLength stored in the backing buffer. The Memstore addition of every Cell 252 // call getTagsLength(). 253 return new NoTagByteBufferChunkKeyValue(buf, offset, len, sequenceId); 254 } else { 255 return new ByteBufferChunkKeyValue(buf, offset, len, sequenceId); 256 } 257 } 258 259 /** 260 * Close this instance since it won't be used any more, try to put the chunks back to pool 261 */ 262 @Override 263 public void close() { 264 if (!this.closed.compareAndSet(false, true)) { 265 return; 266 } 267 // We could put back the chunks to pool for reusing only when there is no 268 // opening scanner which will read their data 269 this.refCnt.release(); 270 } 271 272 @RestrictedApi(explanation = "Should only be called in tests", link = "", 273 allowedOnPath = ".*/src/test/.*") 274 int getRefCntValue() { 275 return this.refCnt.refCnt(); 276 } 277 278 /** 279 * Called when opening a scanner on the data of this MemStoreLAB 280 */ 281 @Override 282 public void incScannerCount() { 283 this.refCnt.retain(); 284 } 285 286 /** 287 * Called when closing a scanner on the data of this MemStoreLAB 288 */ 289 @Override 290 public void decScannerCount() { 291 this.refCnt.release(); 292 } 293 294 private void recycleChunks() { 295 if (reclaimed.compareAndSet(false, true)) { 296 chunkCreator.putbackChunks(chunks); 297 chunks.clear(); 298 } 299 } 300 301 /** 302 * Try to retire the current chunk if it is still <code>c</code>. Postcondition is that 303 * curChunk.get() != c 304 * @param c the chunk to retire 305 */ 306 private void tryRetireChunk(Chunk c) { 307 currChunk.compareAndSet(c, null); 308 // If the CAS succeeds, that means that we won the race 309 // to retire the chunk. We could use this opportunity to 310 // update metrics on external fragmentation. 311 // 312 // If the CAS fails, that means that someone else already 313 // retired the chunk for us. 314 } 315 316 /** 317 * Get the current chunk, or, if there is no current chunk, allocate a new one from the JVM. 318 */ 319 private Chunk getOrMakeChunk() { 320 // Try to get the chunk 321 Chunk c = currChunk.get(); 322 if (c != null) { 323 return c; 324 } 325 // No current chunk, so we want to allocate one. We race 326 // against other allocators to CAS in an uninitialized chunk 327 // (which is cheap to allocate) 328 if (lock.tryLock()) { 329 try { 330 // once again check inside the lock 331 c = currChunk.get(); 332 if (c != null) { 333 return c; 334 } 335 c = this.chunkCreator.getChunk(); 336 if (c != null) { 337 // set the curChunk. No need of CAS as only one thread will be here 338 currChunk.set(c); 339 chunks.add(c.getId()); 340 return c; 341 } 342 } finally { 343 lock.unlock(); 344 } 345 } 346 return null; 347 } 348 349 /* 350 * Returning a new pool chunk, without replacing current chunk, meaning MSLABImpl does not make 351 * the returned chunk as CurChunk. The space on this chunk will be allocated externally. The 352 * interface is only for external callers. 353 */ 354 @Override 355 public Chunk getNewExternalChunk(ChunkCreator.ChunkType chunkType) { 356 switch (chunkType) { 357 case INDEX_CHUNK: 358 case DATA_CHUNK: 359 Chunk c = this.chunkCreator.getChunk(chunkType); 360 chunks.add(c.getId()); 361 return c; 362 case JUMBO_CHUNK: // a jumbo chunk doesn't have a fixed size 363 default: 364 return null; 365 } 366 } 367 368 /* 369 * Returning a new chunk, without replacing current chunk, meaning MSLABImpl does not make the 370 * returned chunk as CurChunk. The space on this chunk will be allocated externally. The interface 371 * is only for external callers. Chunks from pools are not allocated from here, since they have 372 * fixed sizes 373 */ 374 @Override 375 public Chunk getNewExternalChunk(int size) { 376 int allocSize = size + ChunkCreator.SIZEOF_CHUNK_HEADER; 377 if (allocSize <= ChunkCreator.getInstance().getChunkSize()) { 378 return getNewExternalChunk(ChunkCreator.ChunkType.DATA_CHUNK); 379 } else { 380 Chunk c = this.chunkCreator.getJumboChunk(size); 381 chunks.add(c.getId()); 382 return c; 383 } 384 } 385 386 @Override 387 public boolean isOnHeap() { 388 return !isOffHeap(); 389 } 390 391 @Override 392 public boolean isOffHeap() { 393 return this.chunkCreator.isOffheap(); 394 } 395 396 Chunk getCurrentChunk() { 397 return currChunk.get(); 398 } 399 400 BlockingQueue<Chunk> getPooledChunks() { 401 BlockingQueue<Chunk> pooledChunks = new LinkedBlockingQueue<>(); 402 for (Integer id : this.chunks) { 403 Chunk chunk = chunkCreator.getChunk(id); 404 if (chunk != null && chunk.isFromPool()) { 405 pooledChunks.add(chunk); 406 } 407 } 408 return pooledChunks; 409 } 410 411 Integer getNumOfChunksReturnedToPool(Set<Integer> chunksId) { 412 int i = 0; 413 for (Integer id : chunksId) { 414 if (chunkCreator.isChunkInPool(id)) { 415 i++; 416 } 417 } 418 return i; 419 } 420 421 boolean isReclaimed() { 422 return reclaimed.get(); 423 } 424 425 boolean isClosed() { 426 return closed.get(); 427 } 428}