001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.nio.ByteBuffer; 022import java.util.Set; 023import java.util.concurrent.BlockingQueue; 024import java.util.concurrent.ConcurrentSkipListSet; 025import java.util.concurrent.LinkedBlockingQueue; 026import java.util.concurrent.atomic.AtomicBoolean; 027import java.util.concurrent.atomic.AtomicReference; 028import java.util.concurrent.locks.ReentrantLock; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.ByteBufferExtendedCell; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.ExtendedCell; 033import org.apache.hadoop.hbase.nio.RefCnt; 034import org.apache.hadoop.hbase.regionserver.CompactingMemStore.IndexType; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 040 041/** 042 * A memstore-local allocation buffer. 043 * <p> 044 * The MemStoreLAB is basically a bump-the-pointer allocator that allocates big (2MB) byte[] chunks 045 * from and then doles it out to threads that request slices into the array. 046 * <p> 047 * The purpose of this class is to combat heap fragmentation in the regionserver. By ensuring that 048 * all Cells in a given memstore refer only to large chunks of contiguous memory, we ensure that 049 * large blocks get freed up when the memstore is flushed. 050 * <p> 051 * Without the MSLAB, the byte array allocated during insertion end up interleaved throughout the 052 * heap, and the old generation gets progressively more fragmented until a stop-the-world compacting 053 * collection occurs. 054 * <p> 055 * TODO: we should probably benchmark whether word-aligning the allocations would provide a 056 * performance improvement - probably would speed up the Bytes.toLong/Bytes.toInt calls in KeyValue, 057 * but some of those are cached anyway. The chunks created by this MemStoreLAB can get pooled at 058 * {@link ChunkCreator}. When the Chunk comes from pool, it can be either an on heap or an off heap 059 * backed chunk. The chunks, which this MemStoreLAB creates on its own (when no chunk available from 060 * pool), those will be always on heap backed. 061 * <p> 062 * NOTE:if user requested to work with MSLABs (whether on- or off-heap), in 063 * {@link CompactingMemStore} ctor, the {@link CompactingMemStore#indexType} could only be 064 * {@link IndexType#CHUNK_MAP},that is to say the immutable segments using MSLABs are going to use 065 * {@link CellChunkMap} as their index. 066 */ 067@InterfaceAudience.Private 068public class MemStoreLABImpl implements MemStoreLAB { 069 070 static final Logger LOG = LoggerFactory.getLogger(MemStoreLABImpl.class); 071 072 private AtomicReference<Chunk> currChunk = new AtomicReference<>(); 073 // Lock to manage multiple handlers requesting for a chunk 074 private ReentrantLock lock = new ReentrantLock(); 075 076 // A set of chunks contained by this memstore LAB 077 Set<Integer> chunks = new ConcurrentSkipListSet<Integer>(); 078 private final int dataChunkSize; 079 private final int maxAlloc; 080 private final ChunkCreator chunkCreator; 081 082 // This flag is for closing this instance, its set when clearing snapshot of 083 // memstore 084 private final AtomicBoolean closed = new AtomicBoolean(false);; 085 // This flag is for reclaiming chunks. Its set when putting chunks back to 086 // pool 087 private final AtomicBoolean reclaimed = new AtomicBoolean(false); 088 /** 089 * Its initial value is 1, so it is one bigger than the current count of open scanners which 090 * reading data from this MemStoreLAB. 091 */ 092 private final RefCnt refCnt; 093 094 // Used in testing 095 public MemStoreLABImpl() { 096 this(new Configuration()); 097 } 098 099 public MemStoreLABImpl(Configuration conf) { 100 dataChunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT); 101 maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT); 102 this.chunkCreator = ChunkCreator.getInstance(); 103 // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one! 104 Preconditions.checkArgument(maxAlloc <= dataChunkSize, 105 MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY); 106 107 this.refCnt = RefCnt.create(() -> { 108 recycleChunks(); 109 }); 110 111 } 112 113 @Override 114 public ExtendedCell copyCellInto(ExtendedCell cell) { 115 // See head of copyBBECellInto for how it differs from copyCellInto 116 return (cell instanceof ByteBufferExtendedCell) 117 ? copyBBECellInto((ByteBufferExtendedCell) cell, maxAlloc) 118 : copyCellInto(cell, maxAlloc); 119 } 120 121 /** 122 * When a cell's size is too big (bigger than maxAlloc), copyCellInto does not allocate it on 123 * MSLAB. Since the process of flattening to CellChunkMap assumes that all cells are allocated on 124 * MSLAB, during this process, the big cells are copied into MSLAB using this method. 125 */ 126 @Override 127 public ExtendedCell forceCopyOfBigCellInto(ExtendedCell cell) { 128 int size = Segment.getCellLength(cell); 129 Preconditions.checkArgument(size >= 0, "negative size"); 130 if (size + ChunkCreator.SIZEOF_CHUNK_HEADER <= dataChunkSize) { 131 // Using copyCellInto for cells which are bigger than the original maxAlloc 132 return copyCellInto(cell, dataChunkSize); 133 } else { 134 Chunk c = getNewExternalChunk(size); 135 int allocOffset = c.alloc(size); 136 return copyToChunkCell(cell, c.getData(), allocOffset, size); 137 } 138 } 139 140 /** 141 * Mostly a duplicate of {@link #copyCellInto(Cell, int)}} done for perf sake. It presumes 142 * ByteBufferExtendedCell instead of Cell so we deal with a specific type rather than the super 143 * generic Cell. Removes instanceof checks. Shrinkage is enough to make this inline where before 144 * it was too big. Uses less CPU. See HBASE-20875 for evidence. 145 * @see #copyCellInto(Cell, int) 146 */ 147 private ExtendedCell copyBBECellInto(ByteBufferExtendedCell cell, int maxAlloc) { 148 int size = cell.getSerializedSize(); 149 Preconditions.checkArgument(size >= 0, "negative size"); 150 // Callers should satisfy large allocations from JVM heap so limit fragmentation. 151 if (size > maxAlloc) { 152 return null; 153 } 154 Chunk c = null; 155 int allocOffset = 0; 156 while (true) { 157 // Try to get the chunk 158 c = getOrMakeChunk(); 159 // We may get null because the some other thread succeeded in getting the lock 160 // and so the current thread has to try again to make its chunk or grab the chunk 161 // that the other thread created 162 // Try to allocate from this chunk 163 if (c != null) { 164 allocOffset = c.alloc(size); 165 if (allocOffset != -1) { 166 // We succeeded - this is the common case - small alloc 167 // from a big buffer 168 break; 169 } 170 // not enough space! 171 // try to retire this chunk 172 tryRetireChunk(c); 173 } 174 } 175 return copyBBECToChunkCell(cell, c.getData(), allocOffset, size); 176 } 177 178 /** 179 * @see #copyBBECellInto(ByteBufferExtendedCell, int) 180 */ 181 private ExtendedCell copyCellInto(ExtendedCell cell, int maxAlloc) { 182 int size = Segment.getCellLength(cell); 183 Preconditions.checkArgument(size >= 0, "negative size"); 184 // Callers should satisfy large allocations directly from JVM since they 185 // don't cause fragmentation as badly. 186 if (size > maxAlloc) { 187 return null; 188 } 189 Chunk c = null; 190 int allocOffset = 0; 191 while (true) { 192 // Try to get the chunk 193 c = getOrMakeChunk(); 194 // we may get null because the some other thread succeeded in getting the lock 195 // and so the current thread has to try again to make its chunk or grab the chunk 196 // that the other thread created 197 // Try to allocate from this chunk 198 if (c != null) { 199 allocOffset = c.alloc(size); 200 if (allocOffset != -1) { 201 // We succeeded - this is the common case - small alloc 202 // from a big buffer 203 break; 204 } 205 // not enough space! 206 // try to retire this chunk 207 tryRetireChunk(c); 208 } 209 } 210 return copyToChunkCell(cell, c.getData(), allocOffset, size); 211 } 212 213 /** 214 * Clone the passed cell by copying its data into the passed buf and create a cell with a chunkid 215 * out of it 216 * @see #copyBBECToChunkCell(ByteBufferExtendedCell, ByteBuffer, int, int) 217 */ 218 private static ExtendedCell copyToChunkCell(ExtendedCell cell, ByteBuffer buf, int offset, 219 int len) { 220 int tagsLen = cell.getTagsLength(); 221 cell.write(buf, offset); 222 return createChunkCell(buf, offset, len, tagsLen, cell.getSequenceId()); 223 } 224 225 /** 226 * Clone the passed cell by copying its data into the passed buf and create a cell with a chunkid 227 * out of it 228 * @see #copyToChunkCell(Cell, ByteBuffer, int, int) 229 */ 230 private static ExtendedCell copyBBECToChunkCell(ByteBufferExtendedCell cell, ByteBuffer buf, 231 int offset, int len) { 232 int tagsLen = cell.getTagsLength(); 233 cell.write(buf, offset); 234 return createChunkCell(buf, offset, len, tagsLen, cell.getSequenceId()); 235 } 236 237 private static ExtendedCell createChunkCell(ByteBuffer buf, int offset, int len, int tagsLen, 238 long sequenceId) { 239 // TODO : write the seqid here. For writing seqId we should create a new cell type so 240 // that seqId is not used as the state 241 if (tagsLen == 0) { 242 // When tagsLen is 0, make a NoTagsByteBufferKeyValue version. This is an optimized class 243 // which directly return tagsLen as 0. So we avoid parsing many length components in 244 // reading the tagLength stored in the backing buffer. The Memstore addition of every Cell 245 // call getTagsLength(). 246 return new NoTagByteBufferChunkKeyValue(buf, offset, len, sequenceId); 247 } else { 248 return new ByteBufferChunkKeyValue(buf, offset, len, sequenceId); 249 } 250 } 251 252 /** 253 * Close this instance since it won't be used any more, try to put the chunks back to pool 254 */ 255 @Override 256 public void close() { 257 if (!this.closed.compareAndSet(false, true)) { 258 return; 259 } 260 // We could put back the chunks to pool for reusing only when there is no 261 // opening scanner which will read their data 262 this.refCnt.release(); 263 } 264 265 @RestrictedApi(explanation = "Should only be called in tests", link = "", 266 allowedOnPath = ".*/src/test/.*") 267 int getRefCntValue() { 268 return this.refCnt.refCnt(); 269 } 270 271 /** 272 * Called when opening a scanner on the data of this MemStoreLAB 273 */ 274 @Override 275 public void incScannerCount() { 276 this.refCnt.retain(); 277 } 278 279 /** 280 * Called when closing a scanner on the data of this MemStoreLAB 281 */ 282 @Override 283 public void decScannerCount() { 284 this.refCnt.release(); 285 } 286 287 private void recycleChunks() { 288 if (reclaimed.compareAndSet(false, true)) { 289 chunkCreator.putbackChunks(chunks); 290 chunks.clear(); 291 } 292 } 293 294 /** 295 * Try to retire the current chunk if it is still <code>c</code>. Postcondition is that 296 * curChunk.get() != c 297 * @param c the chunk to retire 298 */ 299 private void tryRetireChunk(Chunk c) { 300 currChunk.compareAndSet(c, null); 301 // If the CAS succeeds, that means that we won the race 302 // to retire the chunk. We could use this opportunity to 303 // update metrics on external fragmentation. 304 // 305 // If the CAS fails, that means that someone else already 306 // retired the chunk for us. 307 } 308 309 /** 310 * Get the current chunk, or, if there is no current chunk, allocate a new one from the JVM. 311 */ 312 private Chunk getOrMakeChunk() { 313 // Try to get the chunk 314 Chunk c = currChunk.get(); 315 if (c != null) { 316 return c; 317 } 318 // No current chunk, so we want to allocate one. We race 319 // against other allocators to CAS in an uninitialized chunk 320 // (which is cheap to allocate) 321 if (lock.tryLock()) { 322 try { 323 // once again check inside the lock 324 c = currChunk.get(); 325 if (c != null) { 326 return c; 327 } 328 c = this.chunkCreator.getChunk(); 329 if (c != null) { 330 // set the curChunk. No need of CAS as only one thread will be here 331 currChunk.set(c); 332 chunks.add(c.getId()); 333 return c; 334 } 335 } finally { 336 lock.unlock(); 337 } 338 } 339 return null; 340 } 341 342 /* 343 * Returning a new pool chunk, without replacing current chunk, meaning MSLABImpl does not make 344 * the returned chunk as CurChunk. The space on this chunk will be allocated externally. The 345 * interface is only for external callers. 346 */ 347 @Override 348 public Chunk getNewExternalChunk(ChunkCreator.ChunkType chunkType) { 349 switch (chunkType) { 350 case INDEX_CHUNK: 351 case DATA_CHUNK: 352 Chunk c = this.chunkCreator.getChunk(chunkType); 353 chunks.add(c.getId()); 354 return c; 355 case JUMBO_CHUNK: // a jumbo chunk doesn't have a fixed size 356 default: 357 return null; 358 } 359 } 360 361 /* 362 * Returning a new chunk, without replacing current chunk, meaning MSLABImpl does not make the 363 * returned chunk as CurChunk. The space on this chunk will be allocated externally. The interface 364 * is only for external callers. Chunks from pools are not allocated from here, since they have 365 * fixed sizes 366 */ 367 @Override 368 public Chunk getNewExternalChunk(int size) { 369 int allocSize = size + ChunkCreator.SIZEOF_CHUNK_HEADER; 370 if (allocSize <= ChunkCreator.getInstance().getChunkSize()) { 371 return getNewExternalChunk(ChunkCreator.ChunkType.DATA_CHUNK); 372 } else { 373 Chunk c = this.chunkCreator.getJumboChunk(size); 374 chunks.add(c.getId()); 375 return c; 376 } 377 } 378 379 @Override 380 public boolean isOnHeap() { 381 return !isOffHeap(); 382 } 383 384 @Override 385 public boolean isOffHeap() { 386 return this.chunkCreator.isOffheap(); 387 } 388 389 Chunk getCurrentChunk() { 390 return currChunk.get(); 391 } 392 393 BlockingQueue<Chunk> getPooledChunks() { 394 BlockingQueue<Chunk> pooledChunks = new LinkedBlockingQueue<>(); 395 for (Integer id : this.chunks) { 396 Chunk chunk = chunkCreator.getChunk(id); 397 if (chunk != null && chunk.isFromPool()) { 398 pooledChunks.add(chunk); 399 } 400 } 401 return pooledChunks; 402 } 403 404 Integer getNumOfChunksReturnedToPool(Set<Integer> chunksId) { 405 int i = 0; 406 for (Integer id : chunksId) { 407 if (chunkCreator.isChunkInPool(id)) { 408 i++; 409 } 410 } 411 return i; 412 } 413 414 boolean isReclaimed() { 415 return reclaimed.get(); 416 } 417 418 boolean isClosed() { 419 return closed.get(); 420 } 421}