001/**
002 * Copyright The Apache Software Foundation
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021package org.apache.hadoop.hbase.io.hfile.bucket;
022
023import java.io.File;
024import java.io.FileInputStream;
025import java.io.FileNotFoundException;
026import java.io.FileOutputStream;
027import java.io.IOException;
028import java.io.ObjectInputStream;
029import java.io.ObjectOutputStream;
030import java.io.Serializable;
031import java.nio.ByteBuffer;
032import java.util.ArrayList;
033import java.util.Comparator;
034import java.util.HashSet;
035import java.util.Iterator;
036import java.util.List;
037import java.util.Map;
038import java.util.NavigableSet;
039import java.util.PriorityQueue;
040import java.util.Set;
041import java.util.concurrent.ArrayBlockingQueue;
042import java.util.concurrent.BlockingQueue;
043import java.util.concurrent.ConcurrentHashMap;
044import java.util.concurrent.ConcurrentMap;
045import java.util.concurrent.ConcurrentSkipListSet;
046import java.util.concurrent.Executors;
047import java.util.concurrent.ScheduledExecutorService;
048import java.util.concurrent.TimeUnit;
049import java.util.concurrent.atomic.AtomicInteger;
050import java.util.concurrent.atomic.AtomicLong;
051import java.util.concurrent.atomic.LongAdder;
052import java.util.concurrent.locks.Lock;
053import java.util.concurrent.locks.ReentrantLock;
054import java.util.concurrent.locks.ReentrantReadWriteLock;
055import org.apache.hadoop.conf.Configuration;
056import org.apache.hadoop.hbase.HBaseConfiguration;
057import org.apache.hadoop.hbase.io.HeapSize;
058import org.apache.hadoop.hbase.io.hfile.BlockCache;
059import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
060import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil;
061import org.apache.hadoop.hbase.io.hfile.BlockPriority;
062import org.apache.hadoop.hbase.io.hfile.BlockType;
063import org.apache.hadoop.hbase.io.hfile.CacheStats;
064import org.apache.hadoop.hbase.io.hfile.Cacheable;
065import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
066import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
067import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
068import org.apache.hadoop.hbase.io.hfile.CachedBlock;
069import org.apache.hadoop.hbase.io.hfile.HFileBlock;
070import org.apache.hadoop.hbase.nio.ByteBuff;
071import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
072import org.apache.hadoop.hbase.util.HasThread;
073import org.apache.hadoop.hbase.util.IdReadWriteLock;
074import org.apache.hadoop.hbase.util.IdReadWriteLock.ReferenceType;
075import org.apache.hadoop.hbase.util.UnsafeAvailChecker;
076import org.apache.hadoop.util.StringUtils;
077import org.apache.yetus.audience.InterfaceAudience;
078import org.slf4j.Logger;
079import org.slf4j.LoggerFactory;
080
081import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
082import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
083import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
084
085/**
086 * BucketCache uses {@link BucketAllocator} to allocate/free blocks, and uses
087 * BucketCache#ramCache and BucketCache#backingMap in order to
088 * determine if a given element is in the cache. The bucket cache can use on-heap or
089 * off-heap memory {@link ByteBufferIOEngine} or in a file {@link FileIOEngine} to
090 * store/read the block data.
091 *
092 * <p>Eviction is via a similar algorithm as used in
093 * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache}
094 *
095 * <p>BucketCache can be used as mainly a block cache (see
096 * {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with
097 * LruBlockCache to decrease CMS GC and heap fragmentation.
098 *
099 * <p>It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store
100 * blocks) to enlarge cache space via
101 * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache#setVictimCache}
102 */
103@InterfaceAudience.Private
104public class BucketCache implements BlockCache, HeapSize {
105  private static final Logger LOG = LoggerFactory.getLogger(BucketCache.class);
106
107  /** Priority buckets config */
108  static final String SINGLE_FACTOR_CONFIG_NAME = "hbase.bucketcache.single.factor";
109  static final String MULTI_FACTOR_CONFIG_NAME = "hbase.bucketcache.multi.factor";
110  static final String MEMORY_FACTOR_CONFIG_NAME = "hbase.bucketcache.memory.factor";
111  static final String EXTRA_FREE_FACTOR_CONFIG_NAME = "hbase.bucketcache.extrafreefactor";
112  static final String ACCEPT_FACTOR_CONFIG_NAME = "hbase.bucketcache.acceptfactor";
113  static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor";
114
115  /** Priority buckets */
116  @VisibleForTesting
117  static final float DEFAULT_SINGLE_FACTOR = 0.25f;
118  static final float DEFAULT_MULTI_FACTOR = 0.50f;
119  static final float DEFAULT_MEMORY_FACTOR = 0.25f;
120  static final float DEFAULT_MIN_FACTOR = 0.85f;
121
122  private static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f;
123  private static final float DEFAULT_ACCEPT_FACTOR = 0.95f;
124
125  // Number of blocks to clear for each of the bucket size that is full
126  private static final int DEFAULT_FREE_ENTIRE_BLOCK_FACTOR = 2;
127
128  /** Statistics thread */
129  private static final int statThreadPeriod = 5 * 60;
130
131  final static int DEFAULT_WRITER_THREADS = 3;
132  final static int DEFAULT_WRITER_QUEUE_ITEMS = 64;
133
134  // Store/read block data
135  final IOEngine ioEngine;
136
137  // Store the block in this map before writing it to cache
138  @VisibleForTesting
139  final ConcurrentMap<BlockCacheKey, RAMQueueEntry> ramCache;
140  // In this map, store the block's meta data like offset, length
141  @VisibleForTesting
142  ConcurrentMap<BlockCacheKey, BucketEntry> backingMap;
143
144  /**
145   * Flag if the cache is enabled or not... We shut it off if there are IO
146   * errors for some time, so that Bucket IO exceptions/errors don't bring down
147   * the HBase server.
148   */
149  private volatile boolean cacheEnabled;
150
151  /**
152   * A list of writer queues.  We have a queue per {@link WriterThread} we have running.
153   * In other words, the work adding blocks to the BucketCache is divided up amongst the
154   * running WriterThreads.  Its done by taking hash of the cache key modulo queue count.
155   * WriterThread when it runs takes whatever has been recently added and 'drains' the entries
156   * to the BucketCache.  It then updates the ramCache and backingMap accordingly.
157   */
158  @VisibleForTesting
159  final ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues = new ArrayList<>();
160  @VisibleForTesting
161  final WriterThread[] writerThreads;
162
163  /** Volatile boolean to track if free space is in process or not */
164  private volatile boolean freeInProgress = false;
165  private final Lock freeSpaceLock = new ReentrantLock();
166
167  private UniqueIndexMap<Integer> deserialiserMap = new UniqueIndexMap<>();
168
169  private final LongAdder realCacheSize = new LongAdder();
170  private final LongAdder heapSize = new LongAdder();
171  /** Current number of cached elements */
172  private final LongAdder blockNumber = new LongAdder();
173
174  /** Cache access count (sequential ID) */
175  private final AtomicLong accessCount = new AtomicLong();
176
177  private static final int DEFAULT_CACHE_WAIT_TIME = 50;
178  // Used in test now. If the flag is false and the cache speed is very fast,
179  // bucket cache will skip some blocks when caching. If the flag is true, we
180  // will wait blocks flushed to IOEngine for some time when caching
181  boolean wait_when_cache = false;
182
183  private final BucketCacheStats cacheStats = new BucketCacheStats();
184
185  private final String persistencePath;
186  private final long cacheCapacity;
187  /** Approximate block size */
188  private final long blockSize;
189
190  /** Duration of IO errors tolerated before we disable cache, 1 min as default */
191  private final int ioErrorsTolerationDuration;
192  // 1 min
193  public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000;
194
195  // Start time of first IO error when reading or writing IO Engine, it will be
196  // reset after a successful read/write.
197  private volatile long ioErrorStartTime = -1;
198
199  /**
200   * A ReentrantReadWriteLock to lock on a particular block identified by offset.
201   * The purpose of this is to avoid freeing the block which is being read.
202   * <p>
203   * Key set of offsets in BucketCache is limited so soft reference is the best choice here.
204   */
205  @VisibleForTesting
206  final IdReadWriteLock<Long> offsetLock = new IdReadWriteLock<>(ReferenceType.SOFT);
207
208  private final NavigableSet<BlockCacheKey> blocksByHFile =
209      new ConcurrentSkipListSet<>(new Comparator<BlockCacheKey>() {
210        @Override
211        public int compare(BlockCacheKey a, BlockCacheKey b) {
212          int nameComparison = a.getHfileName().compareTo(b.getHfileName());
213          if (nameComparison != 0) {
214            return nameComparison;
215          }
216
217          if (a.getOffset() == b.getOffset()) {
218            return 0;
219          } else if (a.getOffset() < b.getOffset()) {
220            return -1;
221          }
222          return 1;
223        }
224      });
225
226  /** Statistics thread schedule pool (for heavy debugging, could remove) */
227  private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1,
228    new ThreadFactoryBuilder().setNameFormat("BucketCacheStatsExecutor").setDaemon(true).build());
229
230  // Allocate or free space for the block
231  private BucketAllocator bucketAllocator;
232
233  /** Acceptable size of cache (no evictions if size < acceptable) */
234  private float acceptableFactor;
235
236  /** Minimum threshold of cache (when evicting, evict until size < min) */
237  private float minFactor;
238
239  /** Free this floating point factor of extra blocks when evicting. For example free the number of blocks requested * (1 + extraFreeFactor) */
240  private float extraFreeFactor;
241
242  /** Single access bucket size */
243  private float singleFactor;
244
245  /** Multiple access bucket size */
246  private float multiFactor;
247
248  /** In-memory bucket size */
249  private float memoryFactor;
250
251  public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
252      int writerThreadNum, int writerQLen, String persistencePath) throws FileNotFoundException,
253      IOException {
254    this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
255      persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, HBaseConfiguration.create());
256  }
257
258  public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
259                     int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration,
260                     Configuration conf)
261      throws FileNotFoundException, IOException {
262    this.ioEngine = getIOEngineFromName(ioEngineName, capacity, persistencePath);
263    this.writerThreads = new WriterThread[writerThreadNum];
264    long blockNumCapacity = capacity / blockSize;
265    if (blockNumCapacity >= Integer.MAX_VALUE) {
266      // Enough for about 32TB of cache!
267      throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now");
268    }
269
270    this.acceptableFactor = conf.getFloat(ACCEPT_FACTOR_CONFIG_NAME, DEFAULT_ACCEPT_FACTOR);
271    this.minFactor = conf.getFloat(MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR);
272    this.extraFreeFactor = conf.getFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, DEFAULT_EXTRA_FREE_FACTOR);
273    this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR);
274    this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR);
275    this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR);
276
277    sanityCheckConfigs();
278
279    LOG.info("Instantiating BucketCache with acceptableFactor: " + acceptableFactor + ", minFactor: " + minFactor +
280        ", extraFreeFactor: " + extraFreeFactor + ", singleFactor: " + singleFactor + ", multiFactor: " + multiFactor +
281        ", memoryFactor: " + memoryFactor);
282
283    this.cacheCapacity = capacity;
284    this.persistencePath = persistencePath;
285    this.blockSize = blockSize;
286    this.ioErrorsTolerationDuration = ioErrorsTolerationDuration;
287
288    bucketAllocator = new BucketAllocator(capacity, bucketSizes);
289    for (int i = 0; i < writerThreads.length; ++i) {
290      writerQueues.add(new ArrayBlockingQueue<>(writerQLen));
291    }
292
293    assert writerQueues.size() == writerThreads.length;
294    this.ramCache = new ConcurrentHashMap<>();
295
296    this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity);
297
298    if (ioEngine.isPersistent() && persistencePath != null) {
299      try {
300        retrieveFromFile(bucketSizes);
301      } catch (IOException ioex) {
302        LOG.error("Can't restore from file because of", ioex);
303      } catch (ClassNotFoundException cnfe) {
304        LOG.error("Can't restore from file in rebuild because can't deserialise",cnfe);
305        throw new RuntimeException(cnfe);
306      }
307    }
308    final String threadName = Thread.currentThread().getName();
309    this.cacheEnabled = true;
310    for (int i = 0; i < writerThreads.length; ++i) {
311      writerThreads[i] = new WriterThread(writerQueues.get(i));
312      writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i);
313      writerThreads[i].setDaemon(true);
314    }
315    startWriterThreads();
316
317    // Run the statistics thread periodically to print the cache statistics log
318    // TODO: Add means of turning this off.  Bit obnoxious running thread just to make a log
319    // every five minutes.
320    this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
321        statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
322    LOG.info("Started bucket cache; ioengine=" + ioEngineName +
323        ", capacity=" + StringUtils.byteDesc(capacity) +
324      ", blockSize=" + StringUtils.byteDesc(blockSize) + ", writerThreadNum=" +
325        writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath=" +
326      persistencePath + ", bucketAllocator=" + this.bucketAllocator.getClass().getName());
327  }
328
329  private void sanityCheckConfigs() {
330    Preconditions.checkArgument(acceptableFactor <= 1 && acceptableFactor >= 0, ACCEPT_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
331    Preconditions.checkArgument(minFactor <= 1 && minFactor >= 0, MIN_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
332    Preconditions.checkArgument(minFactor <= acceptableFactor, MIN_FACTOR_CONFIG_NAME + " must be <= " + ACCEPT_FACTOR_CONFIG_NAME);
333    Preconditions.checkArgument(extraFreeFactor >= 0, EXTRA_FREE_FACTOR_CONFIG_NAME + " must be greater than 0.0");
334    Preconditions.checkArgument(singleFactor <= 1 && singleFactor >= 0, SINGLE_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
335    Preconditions.checkArgument(multiFactor <= 1 && multiFactor >= 0, MULTI_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
336    Preconditions.checkArgument(memoryFactor <= 1 && memoryFactor >= 0, MEMORY_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
337    Preconditions.checkArgument((singleFactor + multiFactor + memoryFactor) == 1, SINGLE_FACTOR_CONFIG_NAME + ", " +
338        MULTI_FACTOR_CONFIG_NAME + ", and " + MEMORY_FACTOR_CONFIG_NAME + " segments must add up to 1.0");
339  }
340
341  /**
342   * Called by the constructor to start the writer threads. Used by tests that need to override
343   * starting the threads.
344   */
345  @VisibleForTesting
346  protected void startWriterThreads() {
347    for (WriterThread thread : writerThreads) {
348      thread.start();
349    }
350  }
351
352  @VisibleForTesting
353  boolean isCacheEnabled() {
354    return this.cacheEnabled;
355  }
356
357  @Override
358  public long getMaxSize() {
359    return this.cacheCapacity;
360  }
361
362  public String getIoEngine() {
363    return ioEngine.toString();
364  }
365
366  /**
367   * Get the IOEngine from the IO engine name
368   * @param ioEngineName
369   * @param capacity
370   * @param persistencePath
371   * @return the IOEngine
372   * @throws IOException
373   */
374  private IOEngine getIOEngineFromName(String ioEngineName, long capacity, String persistencePath)
375      throws IOException {
376    if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) {
377      // In order to make the usage simple, we only need the prefix 'files:' in
378      // document whether one or multiple file(s), but also support 'file:' for
379      // the compatibility
380      String[] filePaths = ioEngineName.substring(ioEngineName.indexOf(":") + 1)
381          .split(FileIOEngine.FILE_DELIMITER);
382      return new FileIOEngine(capacity, persistencePath != null, filePaths);
383    } else if (ioEngineName.startsWith("offheap")) {
384      return new ByteBufferIOEngine(capacity);
385    } else if (ioEngineName.startsWith("mmap:")) {
386      return new FileMmapEngine(ioEngineName.substring(5), capacity);
387    } else {
388      throw new IllegalArgumentException(
389          "Don't understand io engine name for cache- prefix with file:, files:, mmap: or offheap");
390    }
391  }
392
393  /**
394   * Cache the block with the specified name and buffer.
395   * @param cacheKey block's cache key
396   * @param buf block buffer
397   */
398  @Override
399  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
400    cacheBlock(cacheKey, buf, false);
401  }
402
403  /**
404   * Cache the block with the specified name and buffer.
405   * @param cacheKey block's cache key
406   * @param cachedItem block buffer
407   * @param inMemory if block is in-memory
408   */
409  @Override
410  public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory) {
411    cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache);
412  }
413
414  /**
415   * Cache the block to ramCache
416   * @param cacheKey block's cache key
417   * @param cachedItem block buffer
418   * @param inMemory if block is in-memory
419   * @param wait if true, blocking wait when queue is full
420   */
421  private void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
422      boolean wait) {
423    if (cacheEnabled) {
424      if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) {
425        if (BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, cachedItem)) {
426          cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait);
427        }
428      } else {
429        cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait);
430      }
431    }
432  }
433
434  private void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem,
435      boolean inMemory, boolean wait) {
436    if (!cacheEnabled) {
437      return;
438    }
439    LOG.trace("Caching key={}, item={}", cacheKey, cachedItem);
440    // Stuff the entry into the RAM cache so it can get drained to the persistent store
441    RAMQueueEntry re =
442        new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory);
443    /**
444     * Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same
445     * key in ramCache, the heap size of bucket cache need to update if replacing entry from
446     * ramCache. But WriterThread will also remove entry from ramCache and update heap size, if
447     * using ramCache.put(), It's possible that the removed entry in WriterThread is not the correct
448     * one, then the heap size will mess up (HBASE-20789)
449     */
450    if (ramCache.putIfAbsent(cacheKey, re) != null) {
451      return;
452    }
453    int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size();
454    BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum);
455    boolean successfulAddition = false;
456    if (wait) {
457      try {
458        successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS);
459      } catch (InterruptedException e) {
460        Thread.currentThread().interrupt();
461      }
462    } else {
463      successfulAddition = bq.offer(re);
464    }
465    if (!successfulAddition) {
466      ramCache.remove(cacheKey);
467      cacheStats.failInsert();
468    } else {
469      this.blockNumber.increment();
470      this.heapSize.add(cachedItem.heapSize());
471      blocksByHFile.add(cacheKey);
472    }
473  }
474
475  /**
476   * Get the buffer of the block with the specified key.
477   * @param key block's cache key
478   * @param caching true if the caller caches blocks on cache misses
479   * @param repeat Whether this is a repeat lookup for the same block
480   * @param updateCacheMetrics Whether we should update cache metrics or not
481   * @return buffer of specified cache key, or null if not in cache
482   */
483  @Override
484  public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
485      boolean updateCacheMetrics) {
486    if (!cacheEnabled) {
487      return null;
488    }
489    RAMQueueEntry re = ramCache.get(key);
490    if (re != null) {
491      if (updateCacheMetrics) {
492        cacheStats.hit(caching, key.isPrimary(), key.getBlockType());
493      }
494      re.access(accessCount.incrementAndGet());
495      return re.getData();
496    }
497    BucketEntry bucketEntry = backingMap.get(key);
498    if (bucketEntry != null) {
499      long start = System.nanoTime();
500      ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset());
501      try {
502        lock.readLock().lock();
503        // We can not read here even if backingMap does contain the given key because its offset
504        // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check
505        // existence here.
506        if (bucketEntry.equals(backingMap.get(key))) {
507          // TODO : change this area - should be removed after server cells and
508          // 12295 are available
509          int len = bucketEntry.getLength();
510          if (LOG.isTraceEnabled()) {
511            LOG.trace("Read offset=" + bucketEntry.offset() + ", len=" + len);
512          }
513          Cacheable cachedBlock = ioEngine.read(bucketEntry.offset(), len,
514              bucketEntry.deserializerReference(this.deserialiserMap));
515          long timeTaken = System.nanoTime() - start;
516          if (updateCacheMetrics) {
517            cacheStats.hit(caching, key.isPrimary(), key.getBlockType());
518            cacheStats.ioHit(timeTaken);
519          }
520          if (cachedBlock.getMemoryType() == MemoryType.SHARED) {
521            bucketEntry.incrementRefCountAndGet();
522          }
523          bucketEntry.access(accessCount.incrementAndGet());
524          if (this.ioErrorStartTime > 0) {
525            ioErrorStartTime = -1;
526          }
527          return cachedBlock;
528        }
529      } catch (IOException ioex) {
530        LOG.error("Failed reading block " + key + " from bucket cache", ioex);
531        checkIOErrorIsTolerated();
532      } finally {
533        lock.readLock().unlock();
534      }
535    }
536    if (!repeat && updateCacheMetrics) {
537      cacheStats.miss(caching, key.isPrimary(), key.getBlockType());
538    }
539    return null;
540  }
541
542  @VisibleForTesting
543  void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber) {
544    bucketAllocator.freeBlock(bucketEntry.offset());
545    realCacheSize.add(-1 * bucketEntry.getLength());
546    blocksByHFile.remove(cacheKey);
547    if (decrementBlockNumber) {
548      this.blockNumber.decrement();
549    }
550  }
551
552  @Override
553  public boolean evictBlock(BlockCacheKey cacheKey) {
554    return evictBlock(cacheKey, true);
555  }
556
557  private RAMQueueEntry checkRamCache(BlockCacheKey cacheKey) {
558    RAMQueueEntry removedBlock = ramCache.remove(cacheKey);
559    if (removedBlock != null) {
560      this.blockNumber.decrement();
561      this.heapSize.add(-1 * removedBlock.getData().heapSize());
562    }
563    return removedBlock;
564  }
565
566  public boolean evictBlock(BlockCacheKey cacheKey, boolean deletedBlock) {
567    if (!cacheEnabled) {
568      return false;
569    }
570    RAMQueueEntry removedBlock = checkRamCache(cacheKey);
571    BucketEntry bucketEntry = backingMap.get(cacheKey);
572    if (bucketEntry == null) {
573      if (removedBlock != null) {
574        cacheStats.evicted(0, cacheKey.isPrimary());
575        return true;
576      } else {
577        return false;
578      }
579    }
580    ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset());
581    try {
582      lock.writeLock().lock();
583      int refCount = bucketEntry.getRefCount();
584      if (refCount == 0) {
585        if (backingMap.remove(cacheKey, bucketEntry)) {
586          blockEvicted(cacheKey, bucketEntry, removedBlock == null);
587        } else {
588          return false;
589        }
590      } else {
591        if(!deletedBlock) {
592          if (LOG.isDebugEnabled()) {
593            LOG.debug("This block " + cacheKey + " is still referred by " + refCount
594                + " readers. Can not be freed now");
595          }
596          return false;
597        } else {
598          if (LOG.isDebugEnabled()) {
599            LOG.debug("This block " + cacheKey + " is still referred by " + refCount
600                + " readers. Can not be freed now. Hence will mark this"
601                + " for evicting at a later point");
602          }
603          bucketEntry.markForEvict();
604        }
605      }
606    } finally {
607      lock.writeLock().unlock();
608    }
609    cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary());
610    return true;
611  }
612
613  /*
614   * Statistics thread.  Periodically output cache statistics to the log.
615   */
616  private static class StatisticsThread extends Thread {
617    private final BucketCache bucketCache;
618
619    public StatisticsThread(BucketCache bucketCache) {
620      super("BucketCacheStatsThread");
621      setDaemon(true);
622      this.bucketCache = bucketCache;
623    }
624
625    @Override
626    public void run() {
627      bucketCache.logStats();
628    }
629  }
630
631  public void logStats() {
632    long totalSize = bucketAllocator.getTotalSize();
633    long usedSize = bucketAllocator.getUsedSize();
634    long freeSize = totalSize - usedSize;
635    long cacheSize = getRealCacheSize();
636    LOG.info("failedBlockAdditions=" + cacheStats.getFailedInserts() + ", " +
637        "totalSize=" + StringUtils.byteDesc(totalSize) + ", " +
638        "freeSize=" + StringUtils.byteDesc(freeSize) + ", " +
639        "usedSize=" + StringUtils.byteDesc(usedSize) +", " +
640        "cacheSize=" + StringUtils.byteDesc(cacheSize) +", " +
641        "accesses=" + cacheStats.getRequestCount() + ", " +
642        "hits=" + cacheStats.getHitCount() + ", " +
643        "IOhitsPerSecond=" + cacheStats.getIOHitsPerSecond() + ", " +
644        "IOTimePerHit=" + String.format("%.2f", cacheStats.getIOTimePerHit())+ ", " +
645        "hitRatio=" + (cacheStats.getHitCount() == 0 ? "0," :
646          (StringUtils.formatPercent(cacheStats.getHitRatio(), 2)+ ", ")) +
647        "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " +
648        "cachingHits=" + cacheStats.getHitCachingCount() + ", " +
649        "cachingHitsRatio=" +(cacheStats.getHitCachingCount() == 0 ? "0," :
650          (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2)+ ", ")) +
651        "evictions=" + cacheStats.getEvictionCount() + ", " +
652        "evicted=" + cacheStats.getEvictedCount() + ", " +
653        "evictedPerRun=" + cacheStats.evictedPerEviction());
654    cacheStats.reset();
655  }
656
657  public long getRealCacheSize() {
658    return this.realCacheSize.sum();
659  }
660
661  private long acceptableSize() {
662    return (long) Math.floor(bucketAllocator.getTotalSize() * acceptableFactor);
663  }
664
665  @VisibleForTesting
666  long getPartitionSize(float partitionFactor) {
667    return (long) Math.floor(bucketAllocator.getTotalSize() * partitionFactor * minFactor);
668  }
669
670  /**
671   * Return the count of bucketSizeinfos still need free space
672   */
673  private int bucketSizesAboveThresholdCount(float minFactor) {
674    BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics();
675    int fullCount = 0;
676    for (int i = 0; i < stats.length; i++) {
677      long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor));
678      freeGoal = Math.max(freeGoal, 1);
679      if (stats[i].freeCount() < freeGoal) {
680        fullCount++;
681      }
682    }
683    return fullCount;
684  }
685
686  /**
687   * This method will find the buckets that are minimally occupied
688   * and are not reference counted and will free them completely
689   * without any constraint on the access times of the elements,
690   * and as a process will completely free at most the number of buckets
691   * passed, sometimes it might not due to changing refCounts
692   *
693   * @param completelyFreeBucketsNeeded number of buckets to free
694   **/
695  private void freeEntireBuckets(int completelyFreeBucketsNeeded) {
696    if (completelyFreeBucketsNeeded != 0) {
697      // First we will build a set where the offsets are reference counted, usually
698      // this set is small around O(Handler Count) unless something else is wrong
699      Set<Integer> inUseBuckets = new HashSet<Integer>();
700      for (BucketEntry entry : backingMap.values()) {
701        if (entry.getRefCount() != 0) {
702          inUseBuckets.add(bucketAllocator.getBucketIndex(entry.offset()));
703        }
704      }
705
706      Set<Integer> candidateBuckets = bucketAllocator.getLeastFilledBuckets(
707          inUseBuckets, completelyFreeBucketsNeeded);
708      for (Map.Entry<BlockCacheKey, BucketEntry> entry : backingMap.entrySet()) {
709        if (candidateBuckets.contains(bucketAllocator
710            .getBucketIndex(entry.getValue().offset()))) {
711          evictBlock(entry.getKey(), false);
712        }
713      }
714    }
715  }
716
717  /**
718   * Free the space if the used size reaches acceptableSize() or one size block
719   * couldn't be allocated. When freeing the space, we use the LRU algorithm and
720   * ensure there must be some blocks evicted
721   * @param why Why we are being called
722   */
723  private void freeSpace(final String why) {
724    // Ensure only one freeSpace progress at a time
725    if (!freeSpaceLock.tryLock()) {
726      return;
727    }
728    try {
729      freeInProgress = true;
730      long bytesToFreeWithoutExtra = 0;
731      // Calculate free byte for each bucketSizeinfo
732      StringBuilder msgBuffer = LOG.isDebugEnabled()? new StringBuilder(): null;
733      BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics();
734      long[] bytesToFreeForBucket = new long[stats.length];
735      for (int i = 0; i < stats.length; i++) {
736        bytesToFreeForBucket[i] = 0;
737        long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor));
738        freeGoal = Math.max(freeGoal, 1);
739        if (stats[i].freeCount() < freeGoal) {
740          bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount());
741          bytesToFreeWithoutExtra += bytesToFreeForBucket[i];
742          if (msgBuffer != null) {
743            msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")="
744              + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", ");
745          }
746        }
747      }
748      if (msgBuffer != null) {
749        msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", ");
750      }
751
752      if (bytesToFreeWithoutExtra <= 0) {
753        return;
754      }
755      long currentSize = bucketAllocator.getUsedSize();
756      long totalSize = bucketAllocator.getTotalSize();
757      if (LOG.isDebugEnabled() && msgBuffer != null) {
758        LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString() +
759          " of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" +
760          StringUtils.byteDesc(realCacheSize.sum()) + ", total=" + StringUtils.byteDesc(totalSize));
761      }
762
763      long bytesToFreeWithExtra = (long) Math.floor(bytesToFreeWithoutExtra
764          * (1 + extraFreeFactor));
765
766      // Instantiate priority buckets
767      BucketEntryGroup bucketSingle = new BucketEntryGroup(bytesToFreeWithExtra,
768          blockSize, getPartitionSize(singleFactor));
769      BucketEntryGroup bucketMulti = new BucketEntryGroup(bytesToFreeWithExtra,
770          blockSize, getPartitionSize(multiFactor));
771      BucketEntryGroup bucketMemory = new BucketEntryGroup(bytesToFreeWithExtra,
772          blockSize, getPartitionSize(memoryFactor));
773
774      // Scan entire map putting bucket entry into appropriate bucket entry
775      // group
776      for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) {
777        switch (bucketEntryWithKey.getValue().getPriority()) {
778          case SINGLE: {
779            bucketSingle.add(bucketEntryWithKey);
780            break;
781          }
782          case MULTI: {
783            bucketMulti.add(bucketEntryWithKey);
784            break;
785          }
786          case MEMORY: {
787            bucketMemory.add(bucketEntryWithKey);
788            break;
789          }
790        }
791      }
792
793      PriorityQueue<BucketEntryGroup> bucketQueue = new PriorityQueue<>(3,
794          Comparator.comparingLong(BucketEntryGroup::overflow));
795
796      bucketQueue.add(bucketSingle);
797      bucketQueue.add(bucketMulti);
798      bucketQueue.add(bucketMemory);
799
800      int remainingBuckets = 3;
801      long bytesFreed = 0;
802
803      BucketEntryGroup bucketGroup;
804      while ((bucketGroup = bucketQueue.poll()) != null) {
805        long overflow = bucketGroup.overflow();
806        if (overflow > 0) {
807          long bucketBytesToFree = Math.min(overflow,
808              (bytesToFreeWithoutExtra - bytesFreed) / remainingBuckets);
809          bytesFreed += bucketGroup.free(bucketBytesToFree);
810        }
811        remainingBuckets--;
812      }
813
814      // Check and free if there are buckets that still need freeing of space
815      if (bucketSizesAboveThresholdCount(minFactor) > 0) {
816        bucketQueue.clear();
817        remainingBuckets = 3;
818
819        bucketQueue.add(bucketSingle);
820        bucketQueue.add(bucketMulti);
821        bucketQueue.add(bucketMemory);
822
823        while ((bucketGroup = bucketQueue.poll()) != null) {
824          long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets;
825          bytesFreed += bucketGroup.free(bucketBytesToFree);
826          remainingBuckets--;
827        }
828      }
829
830      // Even after the above free we might still need freeing because of the
831      // De-fragmentation of the buckets (also called Slab Calcification problem), i.e
832      // there might be some buckets where the occupancy is very sparse and thus are not
833      // yielding the free for the other bucket sizes, the fix for this to evict some
834      // of the buckets, we do this by evicting the buckets that are least fulled
835      freeEntireBuckets(DEFAULT_FREE_ENTIRE_BLOCK_FACTOR *
836          bucketSizesAboveThresholdCount(1.0f));
837
838      if (LOG.isDebugEnabled()) {
839        long single = bucketSingle.totalSize();
840        long multi = bucketMulti.totalSize();
841        long memory = bucketMemory.totalSize();
842        if (LOG.isDebugEnabled()) {
843          LOG.debug("Bucket cache free space completed; " + "freed="
844            + StringUtils.byteDesc(bytesFreed) + ", " + "total="
845            + StringUtils.byteDesc(totalSize) + ", " + "single="
846            + StringUtils.byteDesc(single) + ", " + "multi="
847            + StringUtils.byteDesc(multi) + ", " + "memory="
848            + StringUtils.byteDesc(memory));
849        }
850      }
851
852    } catch (Throwable t) {
853      LOG.warn("Failed freeing space", t);
854    } finally {
855      cacheStats.evict();
856      freeInProgress = false;
857      freeSpaceLock.unlock();
858    }
859  }
860
861  // This handles flushing the RAM cache to IOEngine.
862  @VisibleForTesting
863  class WriterThread extends HasThread {
864    private final BlockingQueue<RAMQueueEntry> inputQueue;
865    private volatile boolean writerEnabled = true;
866
867    WriterThread(BlockingQueue<RAMQueueEntry> queue) {
868      super("BucketCacheWriterThread");
869      this.inputQueue = queue;
870    }
871
872    // Used for test
873    @VisibleForTesting
874    void disableWriter() {
875      this.writerEnabled = false;
876    }
877
878    @Override
879    public void run() {
880      List<RAMQueueEntry> entries = new ArrayList<>();
881      try {
882        while (cacheEnabled && writerEnabled) {
883          try {
884            try {
885              // Blocks
886              entries = getRAMQueueEntries(inputQueue, entries);
887            } catch (InterruptedException ie) {
888              if (!cacheEnabled) break;
889            }
890            doDrain(entries);
891          } catch (Exception ioe) {
892            LOG.error("WriterThread encountered error", ioe);
893          }
894        }
895      } catch (Throwable t) {
896        LOG.warn("Failed doing drain", t);
897      }
898      LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled);
899    }
900
901    /**
902     * Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing
903     * cache with a new block for the same cache key. there's a corner case: one thread cache a
904     * block in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another
905     * new block with the same cache key do the same thing for the same cache key, so if not evict
906     * the previous bucket entry, then memory leak happen because the previous bucketEntry is gone
907     * but the bucketAllocator do not free its memory.
908     * @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey
909     *      cacheKey, Cacheable newBlock)
910     * @param key Block cache key
911     * @param bucketEntry Bucket entry to put into backingMap.
912     */
913    private void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) {
914      BucketEntry previousEntry = backingMap.put(key, bucketEntry);
915      if (previousEntry != null && previousEntry != bucketEntry) {
916        ReentrantReadWriteLock lock = offsetLock.getLock(previousEntry.offset());
917        lock.writeLock().lock();
918        try {
919          blockEvicted(key, previousEntry, false);
920        } finally {
921          lock.writeLock().unlock();
922        }
923      }
924    }
925
926    /**
927     * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap.
928     * Process all that are passed in even if failure being sure to remove from ramCache else we'll
929     * never undo the references and we'll OOME.
930     * @param entries Presumes list passed in here will be processed by this invocation only. No
931     *   interference expected.
932     * @throws InterruptedException
933     */
934    @VisibleForTesting
935    void doDrain(final List<RAMQueueEntry> entries) throws InterruptedException {
936      if (entries.isEmpty()) {
937        return;
938      }
939      // This method is a little hard to follow. We run through the passed in entries and for each
940      // successful add, we add a non-null BucketEntry to the below bucketEntries.  Later we must
941      // do cleanup making sure we've cleared ramCache of all entries regardless of whether we
942      // successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by
943      // filling ramCache.  We do the clean up by again running through the passed in entries
944      // doing extra work when we find a non-null bucketEntries corresponding entry.
945      final int size = entries.size();
946      BucketEntry[] bucketEntries = new BucketEntry[size];
947      // Index updated inside loop if success or if we can't succeed. We retry if cache is full
948      // when we go to add an entry by going around the loop again without upping the index.
949      int index = 0;
950      while (cacheEnabled && index < size) {
951        RAMQueueEntry re = null;
952        try {
953          re = entries.get(index);
954          if (re == null) {
955            LOG.warn("Couldn't get entry or changed on us; who else is messing with it?");
956            index++;
957            continue;
958          }
959          BucketEntry bucketEntry =
960            re.writeToCache(ioEngine, bucketAllocator, deserialiserMap, realCacheSize);
961          // Successfully added.  Up index and add bucketEntry. Clear io exceptions.
962          bucketEntries[index] = bucketEntry;
963          if (ioErrorStartTime > 0) {
964            ioErrorStartTime = -1;
965          }
966          index++;
967        } catch (BucketAllocatorException fle) {
968          LOG.warn("Failed allocation for " + (re == null ? "" : re.getKey()) + "; " + fle);
969          // Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below.
970          bucketEntries[index] = null;
971          index++;
972        } catch (CacheFullException cfe) {
973          // Cache full when we tried to add. Try freeing space and then retrying (don't up index)
974          if (!freeInProgress) {
975            freeSpace("Full!");
976          } else {
977            Thread.sleep(50);
978          }
979        } catch (IOException ioex) {
980          // Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem.
981          LOG.error("Failed writing to bucket cache", ioex);
982          checkIOErrorIsTolerated();
983        }
984      }
985
986      // Make sure data pages are written on media before we update maps.
987      try {
988        ioEngine.sync();
989      } catch (IOException ioex) {
990        LOG.error("Failed syncing IO engine", ioex);
991        checkIOErrorIsTolerated();
992        // Since we failed sync, free the blocks in bucket allocator
993        for (int i = 0; i < entries.size(); ++i) {
994          if (bucketEntries[i] != null) {
995            bucketAllocator.freeBlock(bucketEntries[i].offset());
996            bucketEntries[i] = null;
997          }
998        }
999      }
1000
1001      // Now add to backingMap if successfully added to bucket cache.  Remove from ramCache if
1002      // success or error.
1003      for (int i = 0; i < size; ++i) {
1004        BlockCacheKey key = entries.get(i).getKey();
1005        // Only add if non-null entry.
1006        if (bucketEntries[i] != null) {
1007          putIntoBackingMap(key, bucketEntries[i]);
1008        }
1009        // Always remove from ramCache even if we failed adding it to the block cache above.
1010        RAMQueueEntry ramCacheEntry = ramCache.remove(key);
1011        if (ramCacheEntry != null) {
1012          heapSize.add(-1 * entries.get(i).getData().heapSize());
1013        } else if (bucketEntries[i] != null){
1014          // Block should have already been evicted. Remove it and free space.
1015          ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntries[i].offset());
1016          try {
1017            lock.writeLock().lock();
1018            int refCount = bucketEntries[i].getRefCount();
1019            if (refCount == 0) {
1020              if (backingMap.remove(key, bucketEntries[i])) {
1021                blockEvicted(key, bucketEntries[i], false);
1022              } else {
1023                bucketEntries[i].markForEvict();
1024              }
1025            } else {
1026              bucketEntries[i].markForEvict();
1027            }
1028          } finally {
1029            lock.writeLock().unlock();
1030          }
1031        }
1032      }
1033
1034      long used = bucketAllocator.getUsedSize();
1035      if (used > acceptableSize()) {
1036        freeSpace("Used=" + used + " > acceptable=" + acceptableSize());
1037      }
1038      return;
1039    }
1040  }
1041
1042  /**
1043   * Blocks until elements available in {@code q} then tries to grab as many as possible
1044   * before returning.
1045   * @param receptacle Where to stash the elements taken from queue. We clear before we use it
1046   *     just in case.
1047   * @param q The queue to take from.
1048   * @return {@code receptacle} laden with elements taken from the queue or empty if none found.
1049   */
1050  @VisibleForTesting
1051  static List<RAMQueueEntry> getRAMQueueEntries(final BlockingQueue<RAMQueueEntry> q,
1052      final List<RAMQueueEntry> receptacle)
1053  throws InterruptedException {
1054    // Clear sets all entries to null and sets size to 0. We retain allocations. Presume it
1055    // ok even if list grew to accommodate thousands.
1056    receptacle.clear();
1057    receptacle.add(q.take());
1058    q.drainTo(receptacle);
1059    return receptacle;
1060  }
1061
1062  private void persistToFile() throws IOException {
1063    assert !cacheEnabled;
1064    FileOutputStream fos = null;
1065    ObjectOutputStream oos = null;
1066    try {
1067      if (!ioEngine.isPersistent()) {
1068        throw new IOException("Attempt to persist non-persistent cache mappings!");
1069      }
1070      fos = new FileOutputStream(persistencePath, false);
1071      oos = new ObjectOutputStream(fos);
1072      oos.writeLong(cacheCapacity);
1073      oos.writeUTF(ioEngine.getClass().getName());
1074      oos.writeUTF(backingMap.getClass().getName());
1075      oos.writeObject(deserialiserMap);
1076      oos.writeObject(backingMap);
1077    } finally {
1078      if (oos != null) oos.close();
1079      if (fos != null) fos.close();
1080    }
1081  }
1082
1083  @SuppressWarnings("unchecked")
1084  private void retrieveFromFile(int[] bucketSizes) throws IOException, BucketAllocatorException,
1085      ClassNotFoundException {
1086    File persistenceFile = new File(persistencePath);
1087    if (!persistenceFile.exists()) {
1088      return;
1089    }
1090    assert !cacheEnabled;
1091    FileInputStream fis = null;
1092    ObjectInputStream ois = null;
1093    try {
1094      if (!ioEngine.isPersistent())
1095        throw new IOException(
1096            "Attempt to restore non-persistent cache mappings!");
1097      fis = new FileInputStream(persistencePath);
1098      ois = new ObjectInputStream(fis);
1099      long capacitySize = ois.readLong();
1100      if (capacitySize != cacheCapacity)
1101        throw new IOException("Mismatched cache capacity:"
1102            + StringUtils.byteDesc(capacitySize) + ", expected: "
1103            + StringUtils.byteDesc(cacheCapacity));
1104      String ioclass = ois.readUTF();
1105      String mapclass = ois.readUTF();
1106      if (!ioEngine.getClass().getName().equals(ioclass))
1107        throw new IOException("Class name for IO engine mismatch: " + ioclass
1108            + ", expected:" + ioEngine.getClass().getName());
1109      if (!backingMap.getClass().getName().equals(mapclass))
1110        throw new IOException("Class name for cache map mismatch: " + mapclass
1111            + ", expected:" + backingMap.getClass().getName());
1112      UniqueIndexMap<Integer> deserMap = (UniqueIndexMap<Integer>) ois
1113          .readObject();
1114      ConcurrentHashMap<BlockCacheKey, BucketEntry> backingMapFromFile =
1115          (ConcurrentHashMap<BlockCacheKey, BucketEntry>) ois.readObject();
1116      BucketAllocator allocator = new BucketAllocator(cacheCapacity, bucketSizes,
1117          backingMapFromFile, realCacheSize);
1118      bucketAllocator = allocator;
1119      deserialiserMap = deserMap;
1120      backingMap = backingMapFromFile;
1121    } finally {
1122      if (ois != null) ois.close();
1123      if (fis != null) fis.close();
1124      if (!persistenceFile.delete()) {
1125        throw new IOException("Failed deleting persistence file "
1126            + persistenceFile.getAbsolutePath());
1127      }
1128    }
1129  }
1130
1131  /**
1132   * Check whether we tolerate IO error this time. If the duration of IOEngine
1133   * throwing errors exceeds ioErrorsDurationTimeTolerated, we will disable the
1134   * cache
1135   */
1136  private void checkIOErrorIsTolerated() {
1137    long now = EnvironmentEdgeManager.currentTime();
1138    if (this.ioErrorStartTime > 0) {
1139      if (cacheEnabled && (now - ioErrorStartTime) > this.ioErrorsTolerationDuration) {
1140        LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration +
1141          "ms, disabling cache, please check your IOEngine");
1142        disableCache();
1143      }
1144    } else {
1145      this.ioErrorStartTime = now;
1146    }
1147  }
1148
1149  /**
1150   * Used to shut down the cache -or- turn it off in the case of something broken.
1151   */
1152  private void disableCache() {
1153    if (!cacheEnabled) return;
1154    cacheEnabled = false;
1155    ioEngine.shutdown();
1156    this.scheduleThreadPool.shutdown();
1157    for (int i = 0; i < writerThreads.length; ++i) writerThreads[i].interrupt();
1158    this.ramCache.clear();
1159    if (!ioEngine.isPersistent() || persistencePath == null) {
1160      // If persistent ioengine and a path, we will serialize out the backingMap.
1161      this.backingMap.clear();
1162    }
1163  }
1164
1165  private void join() throws InterruptedException {
1166    for (int i = 0; i < writerThreads.length; ++i)
1167      writerThreads[i].join();
1168  }
1169
1170  @Override
1171  public void shutdown() {
1172    disableCache();
1173    LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent()
1174        + "; path to write=" + persistencePath);
1175    if (ioEngine.isPersistent() && persistencePath != null) {
1176      try {
1177        join();
1178        persistToFile();
1179      } catch (IOException ex) {
1180        LOG.error("Unable to persist data on exit: " + ex.toString(), ex);
1181      } catch (InterruptedException e) {
1182        LOG.warn("Failed to persist data on exit", e);
1183      }
1184    }
1185  }
1186
1187  @Override
1188  public CacheStats getStats() {
1189    return cacheStats;
1190  }
1191
1192  public BucketAllocator getAllocator() {
1193    return this.bucketAllocator;
1194  }
1195
1196  @Override
1197  public long heapSize() {
1198    return this.heapSize.sum();
1199  }
1200
1201  @Override
1202  public long size() {
1203    return this.realCacheSize.sum();
1204  }
1205
1206  @Override
1207  public long getCurrentDataSize() {
1208    return size();
1209  }
1210
1211  @Override
1212  public long getFreeSize() {
1213    return this.bucketAllocator.getFreeSize();
1214  }
1215
1216  @Override
1217  public long getBlockCount() {
1218    return this.blockNumber.sum();
1219  }
1220
1221  @Override
1222  public long getDataBlockCount() {
1223    return getBlockCount();
1224  }
1225
1226  @Override
1227  public long getCurrentSize() {
1228    return this.bucketAllocator.getUsedSize();
1229  }
1230
1231  /**
1232   * Evicts all blocks for a specific HFile.
1233   * <p>
1234   * This is used for evict-on-close to remove all blocks of a specific HFile.
1235   *
1236   * @return the number of blocks evicted
1237   */
1238  @Override
1239  public int evictBlocksByHfileName(String hfileName) {
1240    Set<BlockCacheKey> keySet = blocksByHFile.subSet(
1241        new BlockCacheKey(hfileName, Long.MIN_VALUE), true,
1242        new BlockCacheKey(hfileName, Long.MAX_VALUE), true);
1243
1244    int numEvicted = 0;
1245    for (BlockCacheKey key : keySet) {
1246      if (evictBlock(key)) {
1247          ++numEvicted;
1248      }
1249    }
1250
1251    return numEvicted;
1252  }
1253
1254  /**
1255   * Item in cache. We expect this to be where most memory goes. Java uses 8
1256   * bytes just for object headers; after this, we want to use as little as
1257   * possible - so we only use 8 bytes, but in order to do so we end up messing
1258   * around with all this Java casting stuff. Offset stored as 5 bytes that make
1259   * up the long. Doubt we'll see devices this big for ages. Offsets are divided
1260   * by 256. So 5 bytes gives us 256TB or so.
1261   */
1262  static class BucketEntry implements Serializable {
1263    private static final long serialVersionUID = -6741504807982257534L;
1264
1265    // access counter comparator, descending order
1266    static final Comparator<BucketEntry> COMPARATOR = new Comparator<BucketCache.BucketEntry>() {
1267
1268      @Override
1269      public int compare(BucketEntry o1, BucketEntry o2) {
1270        return Long.compare(o2.accessCounter, o1.accessCounter);
1271      }
1272    };
1273
1274    private int offsetBase;
1275    private int length;
1276    private byte offset1;
1277    byte deserialiserIndex;
1278    private volatile long accessCounter;
1279    private BlockPriority priority;
1280
1281    /**
1282     * Time this block was cached.  Presumes we are created just before we are added to the cache.
1283     */
1284    private final long cachedTime = System.nanoTime();
1285
1286    BucketEntry(long offset, int length, long accessCounter, boolean inMemory) {
1287      setOffset(offset);
1288      this.length = length;
1289      this.accessCounter = accessCounter;
1290      if (inMemory) {
1291        this.priority = BlockPriority.MEMORY;
1292      } else {
1293        this.priority = BlockPriority.SINGLE;
1294      }
1295    }
1296
1297    long offset() { // Java has no unsigned numbers
1298      long o = ((long) offsetBase) & 0xFFFFFFFFL; //This needs the L cast otherwise it will be sign extended as a negative number.
1299      o += (((long) (offset1)) & 0xFF) << 32; //The 0xFF here does not need the L cast because it is treated as a positive int.
1300      return o << 8;
1301    }
1302
1303    private void setOffset(long value) {
1304      assert (value & 0xFF) == 0;
1305      value >>= 8;
1306      offsetBase = (int) value;
1307      offset1 = (byte) (value >> 32);
1308    }
1309
1310    public int getLength() {
1311      return length;
1312    }
1313
1314    protected CacheableDeserializer<Cacheable> deserializerReference(
1315        UniqueIndexMap<Integer> deserialiserMap) {
1316      return CacheableDeserializerIdManager.getDeserializer(deserialiserMap
1317          .unmap(deserialiserIndex));
1318    }
1319
1320    protected void setDeserialiserReference(
1321        CacheableDeserializer<Cacheable> deserializer,
1322        UniqueIndexMap<Integer> deserialiserMap) {
1323      this.deserialiserIndex = ((byte) deserialiserMap.map(deserializer
1324          .getDeserialiserIdentifier()));
1325    }
1326
1327    /**
1328     * Block has been accessed. Update its local access counter.
1329     */
1330    public void access(long accessCounter) {
1331      this.accessCounter = accessCounter;
1332      if (this.priority == BlockPriority.SINGLE) {
1333        this.priority = BlockPriority.MULTI;
1334      }
1335    }
1336
1337    public BlockPriority getPriority() {
1338      return this.priority;
1339    }
1340
1341    public long getCachedTime() {
1342      return cachedTime;
1343    }
1344
1345    protected int getRefCount() {
1346      return 0;
1347    }
1348
1349    protected int incrementRefCountAndGet() {
1350      return 0;
1351    }
1352
1353    protected int decrementRefCountAndGet() {
1354      return 0;
1355    }
1356
1357    protected boolean isMarkedForEvict() {
1358      return false;
1359    }
1360
1361    protected void markForEvict() {
1362      // noop;
1363    }
1364  }
1365
1366  static class SharedMemoryBucketEntry extends BucketEntry {
1367    private static final long serialVersionUID = -2187147283772338481L;
1368
1369    // Set this when we were not able to forcefully evict the block
1370    private volatile boolean markedForEvict;
1371    private AtomicInteger refCount = new AtomicInteger(0);
1372
1373    SharedMemoryBucketEntry(long offset, int length, long accessCounter, boolean inMemory) {
1374      super(offset, length, accessCounter, inMemory);
1375    }
1376
1377    @Override
1378    protected int getRefCount() {
1379      return this.refCount.get();
1380    }
1381
1382    @Override
1383    protected int incrementRefCountAndGet() {
1384      return this.refCount.incrementAndGet();
1385    }
1386
1387    @Override
1388    protected int decrementRefCountAndGet() {
1389      return this.refCount.decrementAndGet();
1390    }
1391
1392    @Override
1393    protected boolean isMarkedForEvict() {
1394      return this.markedForEvict;
1395    }
1396
1397    @Override
1398    protected void markForEvict() {
1399      this.markedForEvict = true;
1400    }
1401  }
1402
1403  /**
1404   * Used to group bucket entries into priority buckets. There will be a
1405   * BucketEntryGroup for each priority (single, multi, memory). Once bucketed,
1406   * the eviction algorithm takes the appropriate number of elements out of each
1407   * according to configuration parameters and their relative sizes.
1408   */
1409  private class BucketEntryGroup {
1410
1411    private CachedEntryQueue queue;
1412    private long totalSize = 0;
1413    private long bucketSize;
1414
1415    public BucketEntryGroup(long bytesToFree, long blockSize, long bucketSize) {
1416      this.bucketSize = bucketSize;
1417      queue = new CachedEntryQueue(bytesToFree, blockSize);
1418      totalSize = 0;
1419    }
1420
1421    public void add(Map.Entry<BlockCacheKey, BucketEntry> block) {
1422      totalSize += block.getValue().getLength();
1423      queue.add(block);
1424    }
1425
1426    public long free(long toFree) {
1427      Map.Entry<BlockCacheKey, BucketEntry> entry;
1428      long freedBytes = 0;
1429      // TODO avoid a cycling siutation. We find no block which is not in use and so no way to free
1430      // What to do then? Caching attempt fail? Need some changes in cacheBlock API?
1431      while ((entry = queue.pollLast()) != null) {
1432        if (evictBlock(entry.getKey(), false)) {
1433          freedBytes += entry.getValue().getLength();
1434        }
1435        if (freedBytes >= toFree) {
1436          return freedBytes;
1437        }
1438      }
1439      return freedBytes;
1440    }
1441
1442    public long overflow() {
1443      return totalSize - bucketSize;
1444    }
1445
1446    public long totalSize() {
1447      return totalSize;
1448    }
1449  }
1450
1451  /**
1452   * Block Entry stored in the memory with key,data and so on
1453   */
1454  @VisibleForTesting
1455  static class RAMQueueEntry {
1456    private BlockCacheKey key;
1457    private Cacheable data;
1458    private long accessCounter;
1459    private boolean inMemory;
1460
1461    public RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter,
1462        boolean inMemory) {
1463      this.key = bck;
1464      this.data = data;
1465      this.accessCounter = accessCounter;
1466      this.inMemory = inMemory;
1467    }
1468
1469    public Cacheable getData() {
1470      return data;
1471    }
1472
1473    public BlockCacheKey getKey() {
1474      return key;
1475    }
1476
1477    public void access(long accessCounter) {
1478      this.accessCounter = accessCounter;
1479    }
1480
1481    private BucketEntry getBucketEntry(IOEngine ioEngine, long offset, int len) {
1482      if (ioEngine.usesSharedMemory()) {
1483        if (UnsafeAvailChecker.isAvailable()) {
1484          return new UnsafeSharedMemoryBucketEntry(offset, len, accessCounter, inMemory);
1485        } else {
1486          return new SharedMemoryBucketEntry(offset, len, accessCounter, inMemory);
1487        }
1488      } else {
1489        return new BucketEntry(offset, len, accessCounter, inMemory);
1490      }
1491    }
1492
1493    public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator bucketAllocator,
1494        final UniqueIndexMap<Integer> deserialiserMap, final LongAdder realCacheSize)
1495        throws IOException {
1496      int len = data.getSerializedLength();
1497      // This cacheable thing can't be serialized
1498      if (len == 0) {
1499        return null;
1500      }
1501      long offset = bucketAllocator.allocateBlock(len);
1502      boolean succ = false;
1503      BucketEntry bucketEntry;
1504      try {
1505        bucketEntry = getBucketEntry(ioEngine, offset, len);
1506        bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap);
1507        if (data instanceof HFileBlock) {
1508          // If an instance of HFileBlock, save on some allocations.
1509          HFileBlock block = (HFileBlock) data;
1510          ByteBuff sliceBuf = block.getBufferReadOnly();
1511          ByteBuffer metadata = block.getMetaData();
1512          ioEngine.write(sliceBuf, offset);
1513          ioEngine.write(metadata, offset + len - metadata.limit());
1514        } else {
1515          ByteBuffer bb = ByteBuffer.allocate(len);
1516          data.serialize(bb, true);
1517          ioEngine.write(bb, offset);
1518        }
1519        succ = true;
1520      } finally {
1521        if (!succ) {
1522          bucketAllocator.freeBlock(offset);
1523        }
1524      }
1525      realCacheSize.add(len);
1526      return bucketEntry;
1527    }
1528  }
1529
1530  /**
1531   * Only used in test
1532   * @throws InterruptedException
1533   */
1534  void stopWriterThreads() throws InterruptedException {
1535    for (WriterThread writerThread : writerThreads) {
1536      writerThread.disableWriter();
1537      writerThread.interrupt();
1538      writerThread.join();
1539    }
1540  }
1541
1542  @Override
1543  public Iterator<CachedBlock> iterator() {
1544    // Don't bother with ramcache since stuff is in here only a little while.
1545    final Iterator<Map.Entry<BlockCacheKey, BucketEntry>> i =
1546        this.backingMap.entrySet().iterator();
1547    return new Iterator<CachedBlock>() {
1548      private final long now = System.nanoTime();
1549
1550      @Override
1551      public boolean hasNext() {
1552        return i.hasNext();
1553      }
1554
1555      @Override
1556      public CachedBlock next() {
1557        final Map.Entry<BlockCacheKey, BucketEntry> e = i.next();
1558        return new CachedBlock() {
1559          @Override
1560          public String toString() {
1561            return BlockCacheUtil.toString(this, now);
1562          }
1563
1564          @Override
1565          public BlockPriority getBlockPriority() {
1566            return e.getValue().getPriority();
1567          }
1568
1569          @Override
1570          public BlockType getBlockType() {
1571            // Not held by BucketEntry.  Could add it if wanted on BucketEntry creation.
1572            return null;
1573          }
1574
1575          @Override
1576          public long getOffset() {
1577            return e.getKey().getOffset();
1578          }
1579
1580          @Override
1581          public long getSize() {
1582            return e.getValue().getLength();
1583          }
1584
1585          @Override
1586          public long getCachedTime() {
1587            return e.getValue().getCachedTime();
1588          }
1589
1590          @Override
1591          public String getFilename() {
1592            return e.getKey().getHfileName();
1593          }
1594
1595          @Override
1596          public int compareTo(CachedBlock other) {
1597            int diff = this.getFilename().compareTo(other.getFilename());
1598            if (diff != 0) return diff;
1599
1600            diff = Long.compare(this.getOffset(), other.getOffset());
1601            if (diff != 0) return diff;
1602            if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
1603              throw new IllegalStateException("" + this.getCachedTime() + ", " +
1604                other.getCachedTime());
1605            }
1606            return Long.compare(other.getCachedTime(), this.getCachedTime());
1607          }
1608
1609          @Override
1610          public int hashCode() {
1611            return e.getKey().hashCode();
1612          }
1613
1614          @Override
1615          public boolean equals(Object obj) {
1616            if (obj instanceof CachedBlock) {
1617              CachedBlock cb = (CachedBlock)obj;
1618              return compareTo(cb) == 0;
1619            } else {
1620              return false;
1621            }
1622          }
1623        };
1624      }
1625
1626      @Override
1627      public void remove() {
1628        throw new UnsupportedOperationException();
1629      }
1630    };
1631  }
1632
1633  @Override
1634  public BlockCache[] getBlockCaches() {
1635    return null;
1636  }
1637
1638  @Override
1639  public void returnBlock(BlockCacheKey cacheKey, Cacheable block) {
1640    if (block.getMemoryType() == MemoryType.SHARED) {
1641      BucketEntry bucketEntry = backingMap.get(cacheKey);
1642      if (bucketEntry != null) {
1643        int refCount = bucketEntry.decrementRefCountAndGet();
1644        if (refCount == 0 && bucketEntry.isMarkedForEvict()) {
1645          evictBlock(cacheKey);
1646        }
1647      }
1648    }
1649  }
1650
1651  @VisibleForTesting
1652  public int getRefCount(BlockCacheKey cacheKey) {
1653    BucketEntry bucketEntry = backingMap.get(cacheKey);
1654    if (bucketEntry != null) {
1655      return bucketEntry.getRefCount();
1656    }
1657    return 0;
1658  }
1659
1660  float getAcceptableFactor() {
1661    return acceptableFactor;
1662  }
1663
1664  float getMinFactor() {
1665    return minFactor;
1666  }
1667
1668  float getExtraFreeFactor() {
1669    return extraFreeFactor;
1670  }
1671
1672  float getSingleFactor() {
1673    return singleFactor;
1674  }
1675
1676  float getMultiFactor() {
1677    return multiFactor;
1678  }
1679
1680  float getMemoryFactor() {
1681    return memoryFactor;
1682  }
1683}