001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.util.Map; 021import java.util.Set; 022import java.util.concurrent.BlockingQueue; 023import java.util.concurrent.ConcurrentHashMap; 024import java.util.concurrent.Executors; 025import java.util.concurrent.LinkedBlockingQueue; 026import java.util.concurrent.ScheduledExecutorService; 027import java.util.concurrent.TimeUnit; 028import java.util.concurrent.atomic.AtomicInteger; 029import java.util.concurrent.atomic.AtomicLong; 030import java.util.concurrent.atomic.LongAdder; 031import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.HeapMemoryTuneObserver; 032import org.apache.hadoop.hbase.util.Bytes; 033import org.apache.hadoop.util.StringUtils; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 039 040/** 041 * Does the management of memstoreLAB chunk creations. A monotonically incrementing id is associated 042 * with every chunk 043 */ 044@InterfaceAudience.Private 045public class ChunkCreator { 046 private static final Logger LOG = LoggerFactory.getLogger(ChunkCreator.class); 047 // monotonically increasing chunkid. Starts at 1. 048 private AtomicInteger chunkID = new AtomicInteger(1); 049 // maps the chunk against the monotonically increasing chunk id. We need to preserve the 050 // natural ordering of the key 051 // CellChunkMap creation should convert the weak ref to hard reference 052 053 // chunk id of each chunk is the first integer written on each chunk, 054 // the header size need to be changed in case chunk id size is changed 055 public static final int SIZEOF_CHUNK_HEADER = Bytes.SIZEOF_INT; 056 057 /** 058 * Types of chunks, based on their sizes 059 */ 060 public enum ChunkType { 061 // An index chunk is a small chunk, allocated from the index chunks pool. 062 // Its size is fixed and is 10% of the size of a data chunk. 063 INDEX_CHUNK, 064 // A data chunk is a regular chunk, allocated from the data chunks pool. 065 // Its size is fixed and given as input to the ChunkCreator c'tor. 066 DATA_CHUNK, 067 // A jumbo chunk isn't allocated from pool. Its size is bigger than the size of a 068 // data chunk, and is determined per chunk (meaning, there is no fixed jumbo size). 069 JUMBO_CHUNK 070 } 071 072 // mapping from chunk IDs to chunks 073 private Map<Integer, Chunk> chunkIdMap = new ConcurrentHashMap<Integer, Chunk>(); 074 075 private final boolean offheap; 076 static ChunkCreator instance; 077 static boolean chunkPoolDisabled = false; 078 private MemStoreChunkPool dataChunksPool; 079 private final int chunkSize; 080 private MemStoreChunkPool indexChunksPool; 081 082 ChunkCreator(int chunkSize, boolean offheap, long globalMemStoreSize, float poolSizePercentage, 083 float initialCountPercentage, HeapMemoryManager heapMemoryManager, 084 float indexChunkSizePercentage) { 085 this.offheap = offheap; 086 this.chunkSize = chunkSize; // in case pools are not allocated 087 initializePools(chunkSize, globalMemStoreSize, poolSizePercentage, indexChunkSizePercentage, 088 initialCountPercentage, heapMemoryManager); 089 } 090 091 private void initializePools(int chunkSize, long globalMemStoreSize, 092 float poolSizePercentage, float indexChunkSizePercentage, 093 float initialCountPercentage, 094 HeapMemoryManager heapMemoryManager) { 095 this.dataChunksPool = initializePool("data", globalMemStoreSize, 096 (1 - indexChunkSizePercentage) * poolSizePercentage, 097 initialCountPercentage, chunkSize, heapMemoryManager); 098 // The index chunks pool is needed only when the index type is CCM. 099 // Since the pools are not created at all when the index type isn't CCM, 100 // we don't need to check it here. 101 this.indexChunksPool = initializePool("index", globalMemStoreSize, 102 indexChunkSizePercentage * poolSizePercentage, 103 initialCountPercentage, (int) (indexChunkSizePercentage * chunkSize), 104 heapMemoryManager); 105 } 106 107 /** 108 * Initializes the instance of ChunkCreator 109 * @param chunkSize the chunkSize 110 * @param offheap indicates if the chunk is to be created offheap or not 111 * @param globalMemStoreSize the global memstore size 112 * @param poolSizePercentage pool size percentage 113 * @param initialCountPercentage the initial count of the chunk pool if any 114 * @param heapMemoryManager the heapmemory manager 115 * @return singleton MSLABChunkCreator 116 */ 117 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "LI_LAZY_INIT_STATIC", 118 justification = "Method is called by single thread at the starting of RS") 119 public static ChunkCreator initialize(int chunkSize, boolean offheap, long globalMemStoreSize, 120 float poolSizePercentage, float initialCountPercentage, 121 HeapMemoryManager heapMemoryManager, 122 float indexChunkSizePercent) { 123 if (instance != null) { 124 return instance; 125 } 126 instance = new ChunkCreator(chunkSize, offheap, globalMemStoreSize, poolSizePercentage, 127 initialCountPercentage, heapMemoryManager, indexChunkSizePercent); 128 return instance; 129 } 130 131 public static ChunkCreator getInstance() { 132 return instance; 133 } 134 135 /** 136 * Creates and inits a chunk. The default implementation for a specific chunk size. 137 * @return the chunk that was initialized 138 */ 139 Chunk getChunk(ChunkType chunkType) { 140 return getChunk(CompactingMemStore.IndexType.ARRAY_MAP, chunkType); 141 } 142 143 /** 144 * Creates and inits a chunk. The default implementation. 145 * @return the chunk that was initialized 146 */ 147 Chunk getChunk() { 148 return getChunk(CompactingMemStore.IndexType.ARRAY_MAP, ChunkType.DATA_CHUNK); 149 } 150 151 /** 152 * Creates and inits a chunk. The default implementation for a specific index type. 153 * @return the chunk that was initialized 154 */ 155 Chunk getChunk(CompactingMemStore.IndexType chunkIndexType) { 156 return getChunk(chunkIndexType, ChunkType.DATA_CHUNK); 157 } 158 159 /** 160 * Creates and inits a chunk with specific index type and type. 161 * @return the chunk that was initialized 162 */ 163 Chunk getChunk(CompactingMemStore.IndexType chunkIndexType, ChunkType chunkType) { 164 switch (chunkType) { 165 case INDEX_CHUNK: 166 if (indexChunksPool != null) { 167 return getChunk(chunkIndexType, indexChunksPool.getChunkSize()); 168 } 169 case DATA_CHUNK: 170 if (dataChunksPool == null) { 171 return getChunk(chunkIndexType, chunkSize); 172 } else { 173 return getChunk(chunkIndexType, dataChunksPool.getChunkSize()); 174 } 175 default: 176 throw new IllegalArgumentException( 177 "chunkType must either be INDEX_CHUNK or DATA_CHUNK"); 178 } 179 } 180 181 /** 182 * Creates and inits a chunk. 183 * @return the chunk that was initialized 184 * @param chunkIndexType whether the requested chunk is going to be used with CellChunkMap index 185 * @param size the size of the chunk to be allocated, in bytes 186 */ 187 Chunk getChunk(CompactingMemStore.IndexType chunkIndexType, int size) { 188 Chunk chunk = null; 189 MemStoreChunkPool pool = null; 190 191 // if the size is suitable for one of the pools 192 if (dataChunksPool != null && size == dataChunksPool.getChunkSize()) { 193 pool = dataChunksPool; 194 } else if (indexChunksPool != null && size == indexChunksPool.getChunkSize()) { 195 pool = indexChunksPool; 196 } 197 198 // if we have a pool 199 if (pool != null) { 200 // the pool creates the chunk internally. The chunk#init() call happens here 201 chunk = pool.getChunk(); 202 // the pool has run out of maxCount 203 if (chunk == null) { 204 if (LOG.isTraceEnabled()) { 205 LOG.trace("The chunk pool is full. Reached maxCount= " + pool.getMaxCount() 206 + ". Creating chunk onheap."); 207 } 208 } 209 } 210 211 if (chunk == null) { 212 // the second parameter explains whether CellChunkMap index is requested, 213 // in that case, put allocated on demand chunk mapping into chunkIdMap 214 chunk = createChunk(false, chunkIndexType, size); 215 } 216 217 // now we need to actually do the expensive memory allocation step in case of a new chunk, 218 // else only the offset is set to the beginning of the chunk to accept allocations 219 chunk.init(); 220 return chunk; 221 } 222 223 /** 224 * Creates and inits a chunk of a special size, bigger than a regular chunk size. 225 * Such a chunk will never come from pool and will always be on demand allocated. 226 * @return the chunk that was initialized 227 * @param jumboSize the special size to be used 228 */ 229 Chunk getJumboChunk(int jumboSize) { 230 int allocSize = jumboSize + SIZEOF_CHUNK_HEADER; 231 if (allocSize <= dataChunksPool.getChunkSize()) { 232 LOG.warn("Jumbo chunk size " + jumboSize + " must be more than regular chunk size " 233 + dataChunksPool.getChunkSize() + ". Converting to regular chunk."); 234 return getChunk(CompactingMemStore.IndexType.CHUNK_MAP); 235 } 236 // the new chunk is going to hold the jumbo cell data and needs to be referenced by 237 // a strong map. Therefore the CCM index type 238 return getChunk(CompactingMemStore.IndexType.CHUNK_MAP, allocSize); 239 } 240 241 /** 242 * Creates the chunk either onheap or offheap 243 * @param pool indicates if the chunks have to be created which will be used by the Pool 244 * @param chunkIndexType whether the requested chunk is going to be used with CellChunkMap index 245 * @param size the size of the chunk to be allocated, in bytes 246 * @return the chunk 247 */ 248 private Chunk createChunk(boolean pool, CompactingMemStore.IndexType chunkIndexType, int size) { 249 Chunk chunk = null; 250 int id = chunkID.getAndIncrement(); 251 assert id > 0; 252 // do not create offheap chunk on demand 253 if (pool && this.offheap) { 254 chunk = new OffheapChunk(size, id, pool); 255 } else { 256 chunk = new OnheapChunk(size, id, pool); 257 } 258 if (pool || (chunkIndexType == CompactingMemStore.IndexType.CHUNK_MAP)) { 259 // put the pool chunk into the chunkIdMap so it is not GC-ed 260 this.chunkIdMap.put(chunk.getId(), chunk); 261 } 262 return chunk; 263 } 264 265 // Chunks from pool are created covered with strong references anyway 266 // TODO: change to CHUNK_MAP if it is generally defined 267 private Chunk createChunkForPool(CompactingMemStore.IndexType chunkIndexType, int chunkSize) { 268 if (chunkSize != dataChunksPool.getChunkSize() && 269 chunkSize != indexChunksPool.getChunkSize()) { 270 return null; 271 } 272 return createChunk(true, chunkIndexType, chunkSize); 273 } 274 275 // Used to translate the ChunkID into a chunk ref 276 Chunk getChunk(int id) { 277 // can return null if chunk was never mapped 278 return chunkIdMap.get(id); 279 } 280 281 boolean isOffheap() { 282 return this.offheap; 283 } 284 285 private void removeChunks(Set<Integer> chunkIDs) { 286 this.chunkIdMap.keySet().removeAll(chunkIDs); 287 } 288 289 Chunk removeChunk(int chunkId) { 290 return this.chunkIdMap.remove(chunkId); 291 } 292 293 // the chunks in the chunkIdMap may already be released so we shouldn't relay 294 // on this counting for strong correctness. This method is used only in testing. 295 int numberOfMappedChunks() { 296 return this.chunkIdMap.size(); 297 } 298 299 void clearChunkIds() { 300 this.chunkIdMap.clear(); 301 } 302 303 /** 304 * A pool of {@link Chunk} instances. 305 * 306 * MemStoreChunkPool caches a number of retired chunks for reusing, it could 307 * decrease allocating bytes when writing, thereby optimizing the garbage 308 * collection on JVM. 309 */ 310 private class MemStoreChunkPool implements HeapMemoryTuneObserver { 311 private final int chunkSize; 312 private int maxCount; 313 314 // A queue of reclaimed chunks 315 private final BlockingQueue<Chunk> reclaimedChunks; 316 private final float poolSizePercentage; 317 318 /** Statistics thread schedule pool */ 319 private final ScheduledExecutorService scheduleThreadPool; 320 /** Statistics thread */ 321 private static final int statThreadPeriod = 60 * 5; 322 private final AtomicLong chunkCount = new AtomicLong(); 323 private final LongAdder reusedChunkCount = new LongAdder(); 324 private final String label; 325 326 MemStoreChunkPool(String label, int chunkSize, int maxCount, int initialCount, 327 float poolSizePercentage) { 328 this.label = label; 329 this.chunkSize = chunkSize; 330 this.maxCount = maxCount; 331 this.poolSizePercentage = poolSizePercentage; 332 this.reclaimedChunks = new LinkedBlockingQueue<>(); 333 for (int i = 0; i < initialCount; i++) { 334 Chunk chunk = createChunk(true, CompactingMemStore.IndexType.ARRAY_MAP, chunkSize); 335 chunk.init(); 336 reclaimedChunks.add(chunk); 337 } 338 chunkCount.set(initialCount); 339 final String n = Thread.currentThread().getName(); 340 scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() 341 .setNameFormat(n + "-MemStoreChunkPool Statistics").setDaemon(true).build()); 342 this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(), statThreadPeriod, 343 statThreadPeriod, TimeUnit.SECONDS); 344 } 345 346 /** 347 * Poll a chunk from the pool, reset it if not null, else create a new chunk to return if we have 348 * not yet created max allowed chunks count. When we have already created max allowed chunks and 349 * no free chunks as of now, return null. It is the responsibility of the caller to make a chunk 350 * then. 351 * Note: Chunks returned by this pool must be put back to the pool after its use. 352 * @return a chunk 353 * @see #putbackChunks(Chunk) 354 */ 355 Chunk getChunk() { 356 return getChunk(CompactingMemStore.IndexType.ARRAY_MAP); 357 } 358 359 Chunk getChunk(CompactingMemStore.IndexType chunkIndexType) { 360 Chunk chunk = reclaimedChunks.poll(); 361 if (chunk != null) { 362 chunk.reset(); 363 reusedChunkCount.increment(); 364 } else { 365 // Make a chunk iff we have not yet created the maxCount chunks 366 while (true) { 367 long created = this.chunkCount.get(); 368 if (created < this.maxCount) { 369 if (this.chunkCount.compareAndSet(created, created + 1)) { 370 chunk = createChunkForPool(chunkIndexType, chunkSize); 371 break; 372 } 373 } else { 374 break; 375 } 376 } 377 } 378 return chunk; 379 } 380 381 int getChunkSize() { 382 return chunkSize; 383 } 384 385 /** 386 * Add the chunks to the pool, when the pool achieves the max size, it will skip the remaining 387 * chunks 388 * @param c 389 */ 390 private void putbackChunks(Chunk c) { 391 int toAdd = this.maxCount - reclaimedChunks.size(); 392 if (c.isFromPool() && c.size == chunkSize && toAdd > 0) { 393 reclaimedChunks.add(c); 394 } else { 395 // remove the chunk (that is not going to pool) 396 // though it is initially from the pool or not 397 ChunkCreator.this.removeChunk(c.getId()); 398 } 399 } 400 401 private class StatisticsThread extends Thread { 402 StatisticsThread() { 403 super("MemStoreChunkPool.StatisticsThread"); 404 setDaemon(true); 405 } 406 407 @Override 408 public void run() { 409 logStats(); 410 } 411 412 private void logStats() { 413 if (!LOG.isDebugEnabled()) return; 414 long created = chunkCount.get(); 415 long reused = reusedChunkCount.sum(); 416 long total = created + reused; 417 LOG.debug("{} stats (chunk size={}): current pool size={}, created chunk count={}, " + 418 "reused chunk count={}, reuseRatio={}", label, chunkSize, reclaimedChunks.size(), 419 created, reused, 420 (total == 0? "0": StringUtils.formatPercent((float)reused/(float)total,2))); 421 } 422 } 423 424 private int getMaxCount() { 425 return this.maxCount; 426 } 427 428 @Override 429 public void onHeapMemoryTune(long newMemstoreSize, long newBlockCacheSize) { 430 // don't do any tuning in case of offheap memstore 431 if (isOffheap()) { 432 LOG.warn("{} not tuning the chunk pool as it is offheap", label); 433 return; 434 } 435 int newMaxCount = 436 (int) (newMemstoreSize * poolSizePercentage / getChunkSize()); 437 if (newMaxCount != this.maxCount) { 438 // We need an adjustment in the chunks numbers 439 if (newMaxCount > this.maxCount) { 440 // Max chunks getting increased. Just change the variable. Later calls to getChunk() would 441 // create and add them to Q 442 LOG.info("{} max count for chunks increased from {} to {}", this.label, this.maxCount, 443 newMaxCount); 444 this.maxCount = newMaxCount; 445 } else { 446 // Max chunks getting decreased. We may need to clear off some of the pooled chunks now 447 // itself. If the extra chunks are serving already, do not pool those when we get them back 448 LOG.info("{} max count for chunks decreased from {} to {}", this.label, this.maxCount, 449 newMaxCount); 450 this.maxCount = newMaxCount; 451 if (this.reclaimedChunks.size() > newMaxCount) { 452 synchronized (this) { 453 while (this.reclaimedChunks.size() > newMaxCount) { 454 this.reclaimedChunks.poll(); 455 } 456 } 457 } 458 } 459 } 460 } 461 } 462 463 static void clearDisableFlag() { 464 chunkPoolDisabled = false; 465 } 466 467 private MemStoreChunkPool initializePool(String label, long globalMemStoreSize, 468 float poolSizePercentage, float initialCountPercentage, int chunkSize, 469 HeapMemoryManager heapMemoryManager) { 470 if (poolSizePercentage <= 0) { 471 LOG.info("{} poolSizePercentage is less than 0. So not using pool", label); 472 return null; 473 } 474 if (chunkPoolDisabled) { 475 return null; 476 } 477 if (poolSizePercentage > 1.0) { 478 throw new IllegalArgumentException( 479 MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY + " must be between 0.0 and 1.0"); 480 } 481 int maxCount = (int) (globalMemStoreSize * poolSizePercentage / chunkSize); 482 if (initialCountPercentage > 1.0 || initialCountPercentage < 0) { 483 throw new IllegalArgumentException(label + " " + MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY + 484 " must be between 0.0 and 1.0"); 485 } 486 int initialCount = (int) (initialCountPercentage * maxCount); 487 LOG.info("Allocating {} MemStoreChunkPool with chunk size {}, max count {}, initial count {}", 488 label, StringUtils.byteDesc(chunkSize), maxCount, initialCount); 489 MemStoreChunkPool memStoreChunkPool = new MemStoreChunkPool(label, chunkSize, maxCount, 490 initialCount, poolSizePercentage); 491 if (heapMemoryManager != null && memStoreChunkPool != null) { 492 // Register with Heap Memory manager 493 heapMemoryManager.registerTuneObserver(memStoreChunkPool); 494 } 495 return memStoreChunkPool; 496 } 497 498 int getMaxCount() { 499 return getMaxCount(ChunkType.DATA_CHUNK); 500 } 501 502 int getMaxCount(ChunkType chunkType) { 503 switch (chunkType) { 504 case INDEX_CHUNK: 505 if (indexChunksPool != null) { 506 return indexChunksPool.getMaxCount(); 507 } 508 break; 509 case DATA_CHUNK: 510 if (dataChunksPool != null) { 511 return dataChunksPool.getMaxCount(); 512 } 513 break; 514 default: 515 throw new IllegalArgumentException( 516 "chunkType must either be INDEX_CHUNK or DATA_CHUNK"); 517 } 518 519 return 0; 520 } 521 522 int getPoolSize() { 523 return getPoolSize(ChunkType.DATA_CHUNK); 524 } 525 526 int getPoolSize(ChunkType chunkType) { 527 switch (chunkType) { 528 case INDEX_CHUNK: 529 if (indexChunksPool != null) { 530 return indexChunksPool.reclaimedChunks.size(); 531 } 532 break; 533 case DATA_CHUNK: 534 if (dataChunksPool != null) { 535 return dataChunksPool.reclaimedChunks.size(); 536 } 537 break; 538 default: 539 throw new IllegalArgumentException( 540 "chunkType must either be INDEX_CHUNK or DATA_CHUNK"); 541 } 542 return 0; 543 } 544 545 boolean isChunkInPool(int chunkId) { 546 Chunk c = getChunk(chunkId); 547 if (c==null) { 548 return false; 549 } 550 551 // chunks that are from pool will return true chunk reference not null 552 if (dataChunksPool != null && dataChunksPool.reclaimedChunks.contains(c)) { 553 return true; 554 } else if (indexChunksPool != null && indexChunksPool.reclaimedChunks.contains(c)) { 555 return true; 556 } 557 return false; 558 } 559 560 /* 561 * Only used in testing 562 */ 563 void clearChunksInPool() { 564 if (dataChunksPool != null) { 565 dataChunksPool.reclaimedChunks.clear(); 566 } 567 if (indexChunksPool != null) { 568 indexChunksPool.reclaimedChunks.clear(); 569 } 570 } 571 572 int getChunkSize() { 573 return getChunkSize(ChunkType.DATA_CHUNK); 574 } 575 576 int getChunkSize(ChunkType chunkType) { 577 switch (chunkType) { 578 case INDEX_CHUNK: 579 if (indexChunksPool != null) { 580 return indexChunksPool.getChunkSize(); 581 } 582 case DATA_CHUNK: 583 if (dataChunksPool != null) { 584 return dataChunksPool.getChunkSize(); 585 } else { // When pools are empty 586 return chunkSize; 587 } 588 default: 589 throw new IllegalArgumentException( 590 "chunkType must either be INDEX_CHUNK or DATA_CHUNK"); 591 } 592 } 593 594 synchronized void putbackChunks(Set<Integer> chunks) { 595 // if there is no pool just try to clear the chunkIdMap in case there is something 596 if (dataChunksPool == null && indexChunksPool == null) { 597 this.removeChunks(chunks); 598 return; 599 } 600 601 // if there is a pool, go over all chunk IDs that came back, the chunks may be from pool or not 602 for (int chunkID : chunks) { 603 // translate chunk ID to chunk, if chunk initially wasn't in pool 604 // this translation will (most likely) return null 605 Chunk chunk = ChunkCreator.this.getChunk(chunkID); 606 if (chunk != null) { 607 if (chunk.isFromPool() && chunk.isIndexChunk()) { 608 indexChunksPool.putbackChunks(chunk); 609 } else if (chunk.isFromPool() && chunk.size == dataChunksPool.getChunkSize()) { 610 dataChunksPool.putbackChunks(chunk); 611 } else { 612 // chunks which are not from one of the pools 613 // should be released without going to the pools. 614 // Removing them from chunkIdMap will cause their removal by the GC. 615 this.removeChunk(chunkID); 616 } 617 } 618 // if chunk is null, it was never covered by the chunkIdMap (and so wasn't in pool also), 619 // so we have nothing to do on its release 620 } 621 return; 622 } 623 624} 625