001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.util.Map;
021import java.util.Set;
022import java.util.concurrent.BlockingQueue;
023import java.util.concurrent.ConcurrentHashMap;
024import java.util.concurrent.Executors;
025import java.util.concurrent.LinkedBlockingQueue;
026import java.util.concurrent.ScheduledExecutorService;
027import java.util.concurrent.TimeUnit;
028import java.util.concurrent.atomic.AtomicInteger;
029import java.util.concurrent.atomic.AtomicLong;
030import java.util.concurrent.atomic.LongAdder;
031import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.HeapMemoryTuneObserver;
032import org.apache.hadoop.hbase.util.Bytes;
033import org.apache.hadoop.util.StringUtils;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
039
040/**
041 * Does the management of memstoreLAB chunk creations. A monotonically incrementing id is associated
042 * with every chunk
043 */
044@InterfaceAudience.Private
045public class ChunkCreator {
046  private static final Logger LOG = LoggerFactory.getLogger(ChunkCreator.class);
047  // monotonically increasing chunkid. Starts at 1.
048  private AtomicInteger chunkID = new AtomicInteger(1);
049  // maps the chunk against the monotonically increasing chunk id. We need to preserve the
050  // natural ordering of the key
051  // CellChunkMap creation should convert the weak ref to hard reference
052
053  // chunk id of each chunk is the first integer written on each chunk,
054  // the header size need to be changed in case chunk id size is changed
055  public static final int SIZEOF_CHUNK_HEADER = Bytes.SIZEOF_INT;
056
057  /**
058   * Types of chunks, based on their sizes
059   */
060  public enum ChunkType {
061    // An index chunk is a small chunk, allocated from the index chunks pool.
062    // Its size is fixed and is 10% of the size of a data chunk.
063    INDEX_CHUNK,
064    // A data chunk is a regular chunk, allocated from the data chunks pool.
065    // Its size is fixed and given as input to the ChunkCreator c'tor.
066    DATA_CHUNK,
067    // A jumbo chunk isn't allocated from pool. Its size is bigger than the size of a
068    // data chunk, and is determined per chunk (meaning, there is no fixed jumbo size).
069    JUMBO_CHUNK
070  }
071
072  // mapping from chunk IDs to chunks
073  private Map<Integer, Chunk> chunkIdMap = new ConcurrentHashMap<Integer, Chunk>();
074
075  private final boolean offheap;
076  static ChunkCreator instance;
077  static boolean chunkPoolDisabled = false;
078  private MemStoreChunkPool dataChunksPool;
079  private final int chunkSize;
080  private MemStoreChunkPool indexChunksPool;
081
082  ChunkCreator(int chunkSize, boolean offheap, long globalMemStoreSize, float poolSizePercentage,
083               float initialCountPercentage, HeapMemoryManager heapMemoryManager,
084               float indexChunkSizePercentage) {
085    this.offheap = offheap;
086    this.chunkSize = chunkSize; // in case pools are not allocated
087    initializePools(chunkSize, globalMemStoreSize, poolSizePercentage, indexChunkSizePercentage,
088            initialCountPercentage, heapMemoryManager);
089  }
090
091  private void initializePools(int chunkSize, long globalMemStoreSize,
092                               float poolSizePercentage, float indexChunkSizePercentage,
093                               float initialCountPercentage,
094                               HeapMemoryManager heapMemoryManager) {
095    this.dataChunksPool = initializePool("data", globalMemStoreSize,
096            (1 - indexChunkSizePercentage) * poolSizePercentage,
097            initialCountPercentage, chunkSize, heapMemoryManager);
098    // The index chunks pool is needed only when the index type is CCM.
099    // Since the pools are not created at all when the index type isn't CCM,
100    // we don't need to check it here.
101    this.indexChunksPool = initializePool("index", globalMemStoreSize,
102            indexChunkSizePercentage * poolSizePercentage,
103            initialCountPercentage, (int) (indexChunkSizePercentage * chunkSize),
104            heapMemoryManager);
105  }
106
107  /**
108   * Initializes the instance of ChunkCreator
109   * @param chunkSize the chunkSize
110   * @param offheap indicates if the chunk is to be created offheap or not
111   * @param globalMemStoreSize  the global memstore size
112   * @param poolSizePercentage pool size percentage
113   * @param initialCountPercentage the initial count of the chunk pool if any
114   * @param heapMemoryManager the heapmemory manager
115   * @return singleton MSLABChunkCreator
116   */
117  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "LI_LAZY_INIT_STATIC",
118          justification = "Method is called by single thread at the starting of RS")
119  public static ChunkCreator initialize(int chunkSize, boolean offheap, long globalMemStoreSize,
120                                        float poolSizePercentage, float initialCountPercentage,
121                                        HeapMemoryManager heapMemoryManager,
122                                        float indexChunkSizePercent) {
123    if (instance != null) {
124      return instance;
125    }
126    instance = new ChunkCreator(chunkSize, offheap, globalMemStoreSize, poolSizePercentage,
127            initialCountPercentage, heapMemoryManager, indexChunkSizePercent);
128    return instance;
129  }
130
131  public static ChunkCreator getInstance() {
132    return instance;
133  }
134
135  /**
136   * Creates and inits a chunk. The default implementation for a specific chunk size.
137   * @return the chunk that was initialized
138   */
139  Chunk getChunk(ChunkType chunkType) {
140    return getChunk(CompactingMemStore.IndexType.ARRAY_MAP, chunkType);
141  }
142
143  /**
144   * Creates and inits a chunk. The default implementation.
145   * @return the chunk that was initialized
146   */
147  Chunk getChunk() {
148    return getChunk(CompactingMemStore.IndexType.ARRAY_MAP, ChunkType.DATA_CHUNK);
149  }
150
151  /**
152   * Creates and inits a chunk. The default implementation for a specific index type.
153   * @return the chunk that was initialized
154   */
155  Chunk getChunk(CompactingMemStore.IndexType chunkIndexType) {
156    return getChunk(chunkIndexType, ChunkType.DATA_CHUNK);
157  }
158
159  /**
160   * Creates and inits a chunk with specific index type and type.
161   * @return the chunk that was initialized
162   */
163  Chunk getChunk(CompactingMemStore.IndexType chunkIndexType, ChunkType chunkType) {
164    switch (chunkType) {
165      case INDEX_CHUNK:
166        if (indexChunksPool != null) {
167          return getChunk(chunkIndexType, indexChunksPool.getChunkSize());
168        }
169      case DATA_CHUNK:
170        if (dataChunksPool == null) {
171          return getChunk(chunkIndexType, chunkSize);
172        } else {
173          return getChunk(chunkIndexType, dataChunksPool.getChunkSize());
174        }
175      default:
176        throw new IllegalArgumentException(
177                "chunkType must either be INDEX_CHUNK or DATA_CHUNK");
178    }
179  }
180
181  /**
182   * Creates and inits a chunk.
183   * @return the chunk that was initialized
184   * @param chunkIndexType whether the requested chunk is going to be used with CellChunkMap index
185   * @param size the size of the chunk to be allocated, in bytes
186   */
187  Chunk getChunk(CompactingMemStore.IndexType chunkIndexType, int size) {
188    Chunk chunk = null;
189    MemStoreChunkPool pool = null;
190
191    // if the size is suitable for one of the pools
192    if (dataChunksPool != null && size == dataChunksPool.getChunkSize()) {
193      pool = dataChunksPool;
194    } else if (indexChunksPool != null && size == indexChunksPool.getChunkSize()) {
195      pool = indexChunksPool;
196    }
197
198    // if we have a pool
199    if (pool != null) {
200      //  the pool creates the chunk internally. The chunk#init() call happens here
201      chunk = pool.getChunk();
202      // the pool has run out of maxCount
203      if (chunk == null) {
204        if (LOG.isTraceEnabled()) {
205          LOG.trace("The chunk pool is full. Reached maxCount= " + pool.getMaxCount()
206                  + ". Creating chunk onheap.");
207        }
208      }
209    }
210
211    if (chunk == null) {
212      // the second parameter explains whether CellChunkMap index is requested,
213      // in that case, put allocated on demand chunk mapping into chunkIdMap
214      chunk = createChunk(false, chunkIndexType, size);
215    }
216
217    // now we need to actually do the expensive memory allocation step in case of a new chunk,
218    // else only the offset is set to the beginning of the chunk to accept allocations
219    chunk.init();
220    return chunk;
221  }
222
223  /**
224   * Creates and inits a chunk of a special size, bigger than a regular chunk size.
225   * Such a chunk will never come from pool and will always be on demand allocated.
226   * @return the chunk that was initialized
227   * @param jumboSize the special size to be used
228   */
229  Chunk getJumboChunk(int jumboSize) {
230    int allocSize = jumboSize + SIZEOF_CHUNK_HEADER;
231    if (allocSize <= dataChunksPool.getChunkSize()) {
232      LOG.warn("Jumbo chunk size " + jumboSize + " must be more than regular chunk size "
233              + dataChunksPool.getChunkSize() + ". Converting to regular chunk.");
234      return getChunk(CompactingMemStore.IndexType.CHUNK_MAP);
235    }
236    // the new chunk is going to hold the jumbo cell data and needs to be referenced by
237    // a strong map. Therefore the CCM index type
238    return getChunk(CompactingMemStore.IndexType.CHUNK_MAP, allocSize);
239  }
240
241  /**
242   * Creates the chunk either onheap or offheap
243   * @param pool indicates if the chunks have to be created which will be used by the Pool
244   * @param chunkIndexType whether the requested chunk is going to be used with CellChunkMap index
245   * @param size the size of the chunk to be allocated, in bytes
246   * @return the chunk
247   */
248  private Chunk createChunk(boolean pool, CompactingMemStore.IndexType chunkIndexType, int size) {
249    Chunk chunk = null;
250    int id = chunkID.getAndIncrement();
251    assert id > 0;
252    // do not create offheap chunk on demand
253    if (pool && this.offheap) {
254      chunk = new OffheapChunk(size, id, pool);
255    } else {
256      chunk = new OnheapChunk(size, id, pool);
257    }
258    if (pool || (chunkIndexType == CompactingMemStore.IndexType.CHUNK_MAP)) {
259      // put the pool chunk into the chunkIdMap so it is not GC-ed
260      this.chunkIdMap.put(chunk.getId(), chunk);
261    }
262    return chunk;
263  }
264
265  // Chunks from pool are created covered with strong references anyway
266  // TODO: change to CHUNK_MAP if it is generally defined
267  private Chunk createChunkForPool(CompactingMemStore.IndexType chunkIndexType, int chunkSize) {
268    if (chunkSize != dataChunksPool.getChunkSize() &&
269            chunkSize != indexChunksPool.getChunkSize()) {
270      return null;
271    }
272    return createChunk(true, chunkIndexType, chunkSize);
273  }
274
275  // Used to translate the ChunkID into a chunk ref
276  Chunk getChunk(int id) {
277    // can return null if chunk was never mapped
278    return chunkIdMap.get(id);
279  }
280
281  boolean isOffheap() {
282    return this.offheap;
283  }
284
285  private void removeChunks(Set<Integer> chunkIDs) {
286    this.chunkIdMap.keySet().removeAll(chunkIDs);
287  }
288
289  Chunk removeChunk(int chunkId) {
290    return this.chunkIdMap.remove(chunkId);
291  }
292
293  // the chunks in the chunkIdMap may already be released so we shouldn't relay
294  // on this counting for strong correctness. This method is used only in testing.
295  int numberOfMappedChunks() {
296    return this.chunkIdMap.size();
297  }
298
299  void clearChunkIds() {
300    this.chunkIdMap.clear();
301  }
302
303  /**
304   * A pool of {@link Chunk} instances.
305   *
306   * MemStoreChunkPool caches a number of retired chunks for reusing, it could
307   * decrease allocating bytes when writing, thereby optimizing the garbage
308   * collection on JVM.
309   */
310  private  class MemStoreChunkPool implements HeapMemoryTuneObserver {
311    private final int chunkSize;
312    private int maxCount;
313
314    // A queue of reclaimed chunks
315    private final BlockingQueue<Chunk> reclaimedChunks;
316    private final float poolSizePercentage;
317
318    /** Statistics thread schedule pool */
319    private final ScheduledExecutorService scheduleThreadPool;
320    /** Statistics thread */
321    private static final int statThreadPeriod = 60 * 5;
322    private final AtomicLong chunkCount = new AtomicLong();
323    private final LongAdder reusedChunkCount = new LongAdder();
324    private final String label;
325
326    MemStoreChunkPool(String label, int chunkSize, int maxCount, int initialCount,
327        float poolSizePercentage) {
328      this.label = label;
329      this.chunkSize = chunkSize;
330      this.maxCount = maxCount;
331      this.poolSizePercentage = poolSizePercentage;
332      this.reclaimedChunks = new LinkedBlockingQueue<>();
333      for (int i = 0; i < initialCount; i++) {
334        Chunk chunk = createChunk(true, CompactingMemStore.IndexType.ARRAY_MAP, chunkSize);
335        chunk.init();
336        reclaimedChunks.add(chunk);
337      }
338      chunkCount.set(initialCount);
339      final String n = Thread.currentThread().getName();
340      scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
341              .setNameFormat(n + "-MemStoreChunkPool Statistics").setDaemon(true).build());
342      this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(), statThreadPeriod,
343              statThreadPeriod, TimeUnit.SECONDS);
344    }
345
346    /**
347     * Poll a chunk from the pool, reset it if not null, else create a new chunk to return if we have
348     * not yet created max allowed chunks count. When we have already created max allowed chunks and
349     * no free chunks as of now, return null. It is the responsibility of the caller to make a chunk
350     * then.
351     * Note: Chunks returned by this pool must be put back to the pool after its use.
352     * @return a chunk
353     * @see #putbackChunks(Chunk)
354     */
355    Chunk getChunk() {
356      return getChunk(CompactingMemStore.IndexType.ARRAY_MAP);
357    }
358
359    Chunk getChunk(CompactingMemStore.IndexType chunkIndexType) {
360      Chunk chunk = reclaimedChunks.poll();
361      if (chunk != null) {
362        chunk.reset();
363        reusedChunkCount.increment();
364      } else {
365        // Make a chunk iff we have not yet created the maxCount chunks
366        while (true) {
367          long created = this.chunkCount.get();
368          if (created < this.maxCount) {
369            if (this.chunkCount.compareAndSet(created, created + 1)) {
370              chunk = createChunkForPool(chunkIndexType, chunkSize);
371              break;
372            }
373          } else {
374            break;
375          }
376        }
377      }
378      return chunk;
379    }
380
381    int getChunkSize() {
382      return chunkSize;
383    }
384
385    /**
386     * Add the chunks to the pool, when the pool achieves the max size, it will skip the remaining
387     * chunks
388     * @param c
389     */
390    private void putbackChunks(Chunk c) {
391      int toAdd = this.maxCount - reclaimedChunks.size();
392      if (c.isFromPool() && c.size == chunkSize && toAdd > 0) {
393        reclaimedChunks.add(c);
394      } else {
395        // remove the chunk (that is not going to pool)
396        // though it is initially from the pool or not
397        ChunkCreator.this.removeChunk(c.getId());
398      }
399    }
400
401    private class StatisticsThread extends Thread {
402      StatisticsThread() {
403        super("MemStoreChunkPool.StatisticsThread");
404        setDaemon(true);
405      }
406
407      @Override
408      public void run() {
409        logStats();
410      }
411
412      private void logStats() {
413        if (!LOG.isDebugEnabled()) return;
414        long created = chunkCount.get();
415        long reused = reusedChunkCount.sum();
416        long total = created + reused;
417        LOG.debug("{} stats (chunk size={}): current pool size={}, created chunk count={}, " +
418                "reused chunk count={}, reuseRatio={}", label, chunkSize, reclaimedChunks.size(),
419            created, reused,
420            (total == 0? "0": StringUtils.formatPercent((float)reused/(float)total,2)));
421      }
422    }
423
424    private int getMaxCount() {
425      return this.maxCount;
426    }
427
428    @Override
429    public void onHeapMemoryTune(long newMemstoreSize, long newBlockCacheSize) {
430      // don't do any tuning in case of offheap memstore
431      if (isOffheap()) {
432        LOG.warn("{} not tuning the chunk pool as it is offheap", label);
433        return;
434      }
435      int newMaxCount =
436              (int) (newMemstoreSize * poolSizePercentage / getChunkSize());
437      if (newMaxCount != this.maxCount) {
438        // We need an adjustment in the chunks numbers
439        if (newMaxCount > this.maxCount) {
440          // Max chunks getting increased. Just change the variable. Later calls to getChunk() would
441          // create and add them to Q
442          LOG.info("{} max count for chunks increased from {} to {}", this.label, this.maxCount,
443              newMaxCount);
444          this.maxCount = newMaxCount;
445        } else {
446          // Max chunks getting decreased. We may need to clear off some of the pooled chunks now
447          // itself. If the extra chunks are serving already, do not pool those when we get them back
448          LOG.info("{} max count for chunks decreased from {} to {}", this.label, this.maxCount,
449              newMaxCount);
450          this.maxCount = newMaxCount;
451          if (this.reclaimedChunks.size() > newMaxCount) {
452            synchronized (this) {
453              while (this.reclaimedChunks.size() > newMaxCount) {
454                this.reclaimedChunks.poll();
455              }
456            }
457          }
458        }
459      }
460    }
461  }
462
463  static void clearDisableFlag() {
464    chunkPoolDisabled = false;
465  }
466
467  private MemStoreChunkPool initializePool(String label, long globalMemStoreSize,
468      float poolSizePercentage, float initialCountPercentage, int chunkSize,
469      HeapMemoryManager heapMemoryManager) {
470    if (poolSizePercentage <= 0) {
471      LOG.info("{} poolSizePercentage is less than 0. So not using pool", label);
472      return null;
473    }
474    if (chunkPoolDisabled) {
475      return null;
476    }
477    if (poolSizePercentage > 1.0) {
478      throw new IllegalArgumentException(
479              MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY + " must be between 0.0 and 1.0");
480    }
481    int maxCount = (int) (globalMemStoreSize * poolSizePercentage / chunkSize);
482    if (initialCountPercentage > 1.0 || initialCountPercentage < 0) {
483      throw new IllegalArgumentException(label + " " + MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY +
484          " must be between 0.0 and 1.0");
485    }
486    int initialCount = (int) (initialCountPercentage * maxCount);
487    LOG.info("Allocating {} MemStoreChunkPool with chunk size {}, max count {}, initial count {}",
488        label, StringUtils.byteDesc(chunkSize), maxCount, initialCount);
489    MemStoreChunkPool memStoreChunkPool = new MemStoreChunkPool(label, chunkSize, maxCount,
490            initialCount, poolSizePercentage);
491    if (heapMemoryManager != null && memStoreChunkPool != null) {
492      // Register with Heap Memory manager
493      heapMemoryManager.registerTuneObserver(memStoreChunkPool);
494    }
495    return memStoreChunkPool;
496  }
497
498  int getMaxCount() {
499    return getMaxCount(ChunkType.DATA_CHUNK);
500  }
501
502  int getMaxCount(ChunkType chunkType) {
503    switch (chunkType) {
504      case INDEX_CHUNK:
505        if (indexChunksPool != null) {
506          return indexChunksPool.getMaxCount();
507        }
508        break;
509      case DATA_CHUNK:
510        if (dataChunksPool != null) {
511          return dataChunksPool.getMaxCount();
512        }
513        break;
514      default:
515        throw new IllegalArgumentException(
516                "chunkType must either be INDEX_CHUNK or DATA_CHUNK");
517    }
518
519    return 0;
520  }
521
522  int getPoolSize() {
523    return getPoolSize(ChunkType.DATA_CHUNK);
524  }
525
526  int getPoolSize(ChunkType chunkType) {
527    switch (chunkType) {
528      case INDEX_CHUNK:
529        if (indexChunksPool != null) {
530          return indexChunksPool.reclaimedChunks.size();
531        }
532        break;
533      case DATA_CHUNK:
534        if (dataChunksPool != null) {
535          return dataChunksPool.reclaimedChunks.size();
536        }
537        break;
538      default:
539        throw new IllegalArgumentException(
540                "chunkType must either be INDEX_CHUNK or DATA_CHUNK");
541    }
542    return 0;
543  }
544
545  boolean isChunkInPool(int chunkId) {
546    Chunk c = getChunk(chunkId);
547    if (c==null) {
548      return false;
549    }
550
551    // chunks that are from pool will return true chunk reference not null
552    if (dataChunksPool != null && dataChunksPool.reclaimedChunks.contains(c)) {
553      return true;
554    } else if (indexChunksPool != null && indexChunksPool.reclaimedChunks.contains(c)) {
555      return true;
556    }
557    return false;
558  }
559
560  /*
561   * Only used in testing
562   */
563  void clearChunksInPool() {
564    if (dataChunksPool != null) {
565      dataChunksPool.reclaimedChunks.clear();
566    }
567    if (indexChunksPool != null) {
568      indexChunksPool.reclaimedChunks.clear();
569    }
570  }
571
572  int getChunkSize() {
573    return getChunkSize(ChunkType.DATA_CHUNK);
574  }
575
576  int getChunkSize(ChunkType chunkType) {
577    switch (chunkType) {
578      case INDEX_CHUNK:
579        if (indexChunksPool != null) {
580          return indexChunksPool.getChunkSize();
581        }
582      case DATA_CHUNK:
583        if (dataChunksPool != null) {
584          return dataChunksPool.getChunkSize();
585        } else { // When pools are empty
586          return chunkSize;
587        }
588      default:
589        throw new IllegalArgumentException(
590                "chunkType must either be INDEX_CHUNK or DATA_CHUNK");
591    }
592  }
593
594  synchronized void putbackChunks(Set<Integer> chunks) {
595    // if there is no pool just try to clear the chunkIdMap in case there is something
596    if (dataChunksPool == null && indexChunksPool == null) {
597      this.removeChunks(chunks);
598      return;
599    }
600
601    // if there is a pool, go over all chunk IDs that came back, the chunks may be from pool or not
602    for (int chunkID : chunks) {
603      // translate chunk ID to chunk, if chunk initially wasn't in pool
604      // this translation will (most likely) return null
605      Chunk chunk = ChunkCreator.this.getChunk(chunkID);
606      if (chunk != null) {
607        if (chunk.isFromPool() && chunk.isIndexChunk()) {
608          indexChunksPool.putbackChunks(chunk);
609        } else if (chunk.isFromPool() && chunk.size == dataChunksPool.getChunkSize()) {
610          dataChunksPool.putbackChunks(chunk);
611        } else {
612          // chunks which are not from one of the pools
613          // should be released without going to the pools.
614          // Removing them from chunkIdMap will cause their removal by the GC.
615          this.removeChunk(chunkID);
616        }
617      }
618      // if chunk is null, it was never covered by the chunkIdMap (and so wasn't in pool also),
619      // so we have nothing to do on its release
620    }
621    return;
622  }
623
624}
625