001/**
002 * Copyright The Apache Software Foundation
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021package org.apache.hadoop.hbase.io.hfile.bucket;
022
023import java.io.File;
024import java.io.FileInputStream;
025import java.io.FileNotFoundException;
026import java.io.FileOutputStream;
027import java.io.IOException;
028import java.io.ObjectInputStream;
029import java.io.ObjectOutputStream;
030import java.io.Serializable;
031import java.nio.ByteBuffer;
032import java.util.ArrayList;
033import java.util.Comparator;
034import java.util.HashSet;
035import java.util.Iterator;
036import java.util.List;
037import java.util.Map;
038import java.util.NavigableSet;
039import java.util.PriorityQueue;
040import java.util.Set;
041import java.util.concurrent.ArrayBlockingQueue;
042import java.util.concurrent.BlockingQueue;
043import java.util.concurrent.ConcurrentHashMap;
044import java.util.concurrent.ConcurrentMap;
045import java.util.concurrent.ConcurrentSkipListSet;
046import java.util.concurrent.Executors;
047import java.util.concurrent.ScheduledExecutorService;
048import java.util.concurrent.TimeUnit;
049import java.util.concurrent.atomic.AtomicInteger;
050import java.util.concurrent.atomic.AtomicLong;
051import java.util.concurrent.atomic.LongAdder;
052import java.util.concurrent.locks.Lock;
053import java.util.concurrent.locks.ReentrantLock;
054import java.util.concurrent.locks.ReentrantReadWriteLock;
055import org.apache.hadoop.conf.Configuration;
056import org.apache.hadoop.hbase.HBaseConfiguration;
057import org.apache.hadoop.hbase.io.HeapSize;
058import org.apache.hadoop.hbase.io.hfile.BlockCache;
059import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
060import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil;
061import org.apache.hadoop.hbase.io.hfile.BlockPriority;
062import org.apache.hadoop.hbase.io.hfile.BlockType;
063import org.apache.hadoop.hbase.io.hfile.CacheStats;
064import org.apache.hadoop.hbase.io.hfile.Cacheable;
065import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
066import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
067import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
068import org.apache.hadoop.hbase.io.hfile.CachedBlock;
069import org.apache.hadoop.hbase.io.hfile.HFileBlock;
070import org.apache.hadoop.hbase.nio.ByteBuff;
071import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
072import org.apache.hadoop.hbase.util.HasThread;
073import org.apache.hadoop.hbase.util.IdReadWriteLock;
074import org.apache.hadoop.hbase.util.IdReadWriteLock.ReferenceType;
075import org.apache.hadoop.hbase.util.UnsafeAvailChecker;
076import org.apache.hadoop.util.StringUtils;
077import org.apache.yetus.audience.InterfaceAudience;
078import org.slf4j.Logger;
079import org.slf4j.LoggerFactory;
080
081import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
082import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
083import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
084
085/**
086 * BucketCache uses {@link BucketAllocator} to allocate/free blocks, and uses
087 * BucketCache#ramCache and BucketCache#backingMap in order to
088 * determine if a given element is in the cache. The bucket cache can use on-heap or
089 * off-heap memory {@link ByteBufferIOEngine} or in a file {@link FileIOEngine} to
090 * store/read the block data.
091 *
092 * <p>Eviction is via a similar algorithm as used in
093 * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache}
094 *
095 * <p>BucketCache can be used as mainly a block cache (see
096 * {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with
097 * LruBlockCache to decrease CMS GC and heap fragmentation.
098 *
099 * <p>It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store
100 * blocks) to enlarge cache space via
101 * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache#setVictimCache}
102 */
103@InterfaceAudience.Private
104public class BucketCache implements BlockCache, HeapSize {
105  private static final Logger LOG = LoggerFactory.getLogger(BucketCache.class);
106
107  /** Priority buckets config */
108  static final String SINGLE_FACTOR_CONFIG_NAME = "hbase.bucketcache.single.factor";
109  static final String MULTI_FACTOR_CONFIG_NAME = "hbase.bucketcache.multi.factor";
110  static final String MEMORY_FACTOR_CONFIG_NAME = "hbase.bucketcache.memory.factor";
111  static final String EXTRA_FREE_FACTOR_CONFIG_NAME = "hbase.bucketcache.extrafreefactor";
112  static final String ACCEPT_FACTOR_CONFIG_NAME = "hbase.bucketcache.acceptfactor";
113  static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor";
114
115  /** Priority buckets */
116  @VisibleForTesting
117  static final float DEFAULT_SINGLE_FACTOR = 0.25f;
118  static final float DEFAULT_MULTI_FACTOR = 0.50f;
119  static final float DEFAULT_MEMORY_FACTOR = 0.25f;
120  static final float DEFAULT_MIN_FACTOR = 0.85f;
121
122  private static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f;
123  private static final float DEFAULT_ACCEPT_FACTOR = 0.95f;
124
125  // Number of blocks to clear for each of the bucket size that is full
126  private static final int DEFAULT_FREE_ENTIRE_BLOCK_FACTOR = 2;
127
128  /** Statistics thread */
129  private static final int statThreadPeriod = 5 * 60;
130
131  final static int DEFAULT_WRITER_THREADS = 3;
132  final static int DEFAULT_WRITER_QUEUE_ITEMS = 64;
133
134  // Store/read block data
135  final IOEngine ioEngine;
136
137  // Store the block in this map before writing it to cache
138  @VisibleForTesting
139  final ConcurrentMap<BlockCacheKey, RAMQueueEntry> ramCache;
140  // In this map, store the block's meta data like offset, length
141  @VisibleForTesting
142  ConcurrentMap<BlockCacheKey, BucketEntry> backingMap;
143
144  /**
145   * Flag if the cache is enabled or not... We shut it off if there are IO
146   * errors for some time, so that Bucket IO exceptions/errors don't bring down
147   * the HBase server.
148   */
149  private volatile boolean cacheEnabled;
150
151  /**
152   * A list of writer queues.  We have a queue per {@link WriterThread} we have running.
153   * In other words, the work adding blocks to the BucketCache is divided up amongst the
154   * running WriterThreads.  Its done by taking hash of the cache key modulo queue count.
155   * WriterThread when it runs takes whatever has been recently added and 'drains' the entries
156   * to the BucketCache.  It then updates the ramCache and backingMap accordingly.
157   */
158  @VisibleForTesting
159  final ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues = new ArrayList<>();
160  @VisibleForTesting
161  final WriterThread[] writerThreads;
162
163  /** Volatile boolean to track if free space is in process or not */
164  private volatile boolean freeInProgress = false;
165  private final Lock freeSpaceLock = new ReentrantLock();
166
167  private UniqueIndexMap<Integer> deserialiserMap = new UniqueIndexMap<>();
168
169  private final LongAdder realCacheSize = new LongAdder();
170  private final LongAdder heapSize = new LongAdder();
171  /** Current number of cached elements */
172  private final LongAdder blockNumber = new LongAdder();
173
174  /** Cache access count (sequential ID) */
175  private final AtomicLong accessCount = new AtomicLong();
176
177  private static final int DEFAULT_CACHE_WAIT_TIME = 50;
178  // Used in test now. If the flag is false and the cache speed is very fast,
179  // bucket cache will skip some blocks when caching. If the flag is true, we
180  // will wait blocks flushed to IOEngine for some time when caching
181  boolean wait_when_cache = false;
182
183  private final BucketCacheStats cacheStats = new BucketCacheStats();
184
185  private final String persistencePath;
186  private final long cacheCapacity;
187  /** Approximate block size */
188  private final long blockSize;
189
190  /** Duration of IO errors tolerated before we disable cache, 1 min as default */
191  private final int ioErrorsTolerationDuration;
192  // 1 min
193  public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000;
194
195  // Start time of first IO error when reading or writing IO Engine, it will be
196  // reset after a successful read/write.
197  private volatile long ioErrorStartTime = -1;
198
199  /**
200   * A ReentrantReadWriteLock to lock on a particular block identified by offset.
201   * The purpose of this is to avoid freeing the block which is being read.
202   * <p>
203   * Key set of offsets in BucketCache is limited so soft reference is the best choice here.
204   */
205  @VisibleForTesting
206  final IdReadWriteLock<Long> offsetLock = new IdReadWriteLock<>(ReferenceType.SOFT);
207
208  private final NavigableSet<BlockCacheKey> blocksByHFile =
209      new ConcurrentSkipListSet<>(new Comparator<BlockCacheKey>() {
210        @Override
211        public int compare(BlockCacheKey a, BlockCacheKey b) {
212          int nameComparison = a.getHfileName().compareTo(b.getHfileName());
213          if (nameComparison != 0) {
214            return nameComparison;
215          }
216
217          if (a.getOffset() == b.getOffset()) {
218            return 0;
219          } else if (a.getOffset() < b.getOffset()) {
220            return -1;
221          }
222          return 1;
223        }
224      });
225
226  /** Statistics thread schedule pool (for heavy debugging, could remove) */
227  private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1,
228    new ThreadFactoryBuilder().setNameFormat("BucketCacheStatsExecutor").setDaemon(true).build());
229
230  // Allocate or free space for the block
231  private BucketAllocator bucketAllocator;
232
233  /** Acceptable size of cache (no evictions if size < acceptable) */
234  private float acceptableFactor;
235
236  /** Minimum threshold of cache (when evicting, evict until size < min) */
237  private float minFactor;
238
239  /** Free this floating point factor of extra blocks when evicting. For example free the number of blocks requested * (1 + extraFreeFactor) */
240  private float extraFreeFactor;
241
242  /** Single access bucket size */
243  private float singleFactor;
244
245  /** Multiple access bucket size */
246  private float multiFactor;
247
248  /** In-memory bucket size */
249  private float memoryFactor;
250
251  public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
252      int writerThreadNum, int writerQLen, String persistencePath) throws FileNotFoundException,
253      IOException {
254    this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
255      persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, HBaseConfiguration.create());
256  }
257
258  public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
259                     int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration,
260                     Configuration conf)
261      throws FileNotFoundException, IOException {
262    this.ioEngine = getIOEngineFromName(ioEngineName, capacity, persistencePath);
263    this.writerThreads = new WriterThread[writerThreadNum];
264    long blockNumCapacity = capacity / blockSize;
265    if (blockNumCapacity >= Integer.MAX_VALUE) {
266      // Enough for about 32TB of cache!
267      throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now");
268    }
269
270    this.acceptableFactor = conf.getFloat(ACCEPT_FACTOR_CONFIG_NAME, DEFAULT_ACCEPT_FACTOR);
271    this.minFactor = conf.getFloat(MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR);
272    this.extraFreeFactor = conf.getFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, DEFAULT_EXTRA_FREE_FACTOR);
273    this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR);
274    this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR);
275    this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR);
276
277    sanityCheckConfigs();
278
279    LOG.info("Instantiating BucketCache with acceptableFactor: " + acceptableFactor + ", minFactor: " + minFactor +
280        ", extraFreeFactor: " + extraFreeFactor + ", singleFactor: " + singleFactor + ", multiFactor: " + multiFactor +
281        ", memoryFactor: " + memoryFactor);
282
283    this.cacheCapacity = capacity;
284    this.persistencePath = persistencePath;
285    this.blockSize = blockSize;
286    this.ioErrorsTolerationDuration = ioErrorsTolerationDuration;
287
288    bucketAllocator = new BucketAllocator(capacity, bucketSizes);
289    for (int i = 0; i < writerThreads.length; ++i) {
290      writerQueues.add(new ArrayBlockingQueue<>(writerQLen));
291    }
292
293    assert writerQueues.size() == writerThreads.length;
294    this.ramCache = new ConcurrentHashMap<>();
295
296    this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity);
297
298    if (ioEngine.isPersistent() && persistencePath != null) {
299      try {
300        retrieveFromFile(bucketSizes);
301      } catch (IOException ioex) {
302        LOG.error("Can't restore from file because of", ioex);
303      } catch (ClassNotFoundException cnfe) {
304        LOG.error("Can't restore from file in rebuild because can't deserialise",cnfe);
305        throw new RuntimeException(cnfe);
306      }
307    }
308    final String threadName = Thread.currentThread().getName();
309    this.cacheEnabled = true;
310    for (int i = 0; i < writerThreads.length; ++i) {
311      writerThreads[i] = new WriterThread(writerQueues.get(i));
312      writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i);
313      writerThreads[i].setDaemon(true);
314    }
315    startWriterThreads();
316
317    // Run the statistics thread periodically to print the cache statistics log
318    // TODO: Add means of turning this off.  Bit obnoxious running thread just to make a log
319    // every five minutes.
320    this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
321        statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
322    LOG.info("Started bucket cache; ioengine=" + ioEngineName +
323        ", capacity=" + StringUtils.byteDesc(capacity) +
324      ", blockSize=" + StringUtils.byteDesc(blockSize) + ", writerThreadNum=" +
325        writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath=" +
326      persistencePath + ", bucketAllocator=" + this.bucketAllocator.getClass().getName());
327  }
328
329  private void sanityCheckConfigs() {
330    Preconditions.checkArgument(acceptableFactor <= 1 && acceptableFactor >= 0, ACCEPT_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
331    Preconditions.checkArgument(minFactor <= 1 && minFactor >= 0, MIN_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
332    Preconditions.checkArgument(minFactor <= acceptableFactor, MIN_FACTOR_CONFIG_NAME + " must be <= " + ACCEPT_FACTOR_CONFIG_NAME);
333    Preconditions.checkArgument(extraFreeFactor >= 0, EXTRA_FREE_FACTOR_CONFIG_NAME + " must be greater than 0.0");
334    Preconditions.checkArgument(singleFactor <= 1 && singleFactor >= 0, SINGLE_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
335    Preconditions.checkArgument(multiFactor <= 1 && multiFactor >= 0, MULTI_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
336    Preconditions.checkArgument(memoryFactor <= 1 && memoryFactor >= 0, MEMORY_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
337    Preconditions.checkArgument((singleFactor + multiFactor + memoryFactor) == 1, SINGLE_FACTOR_CONFIG_NAME + ", " +
338        MULTI_FACTOR_CONFIG_NAME + ", and " + MEMORY_FACTOR_CONFIG_NAME + " segments must add up to 1.0");
339  }
340
341  /**
342   * Called by the constructor to start the writer threads. Used by tests that need to override
343   * starting the threads.
344   */
345  @VisibleForTesting
346  protected void startWriterThreads() {
347    for (WriterThread thread : writerThreads) {
348      thread.start();
349    }
350  }
351
352  @VisibleForTesting
353  boolean isCacheEnabled() {
354    return this.cacheEnabled;
355  }
356
357  @Override
358  public long getMaxSize() {
359    return this.cacheCapacity;
360  }
361
362  public String getIoEngine() {
363    return ioEngine.toString();
364  }
365
366  /**
367   * Get the IOEngine from the IO engine name
368   * @param ioEngineName
369   * @param capacity
370   * @param persistencePath
371   * @return the IOEngine
372   * @throws IOException
373   */
374  private IOEngine getIOEngineFromName(String ioEngineName, long capacity, String persistencePath)
375      throws IOException {
376    if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) {
377      // In order to make the usage simple, we only need the prefix 'files:' in
378      // document whether one or multiple file(s), but also support 'file:' for
379      // the compatibility
380      String[] filePaths = ioEngineName.substring(ioEngineName.indexOf(":") + 1)
381          .split(FileIOEngine.FILE_DELIMITER);
382      return new FileIOEngine(capacity, persistencePath != null, filePaths);
383    } else if (ioEngineName.startsWith("offheap")) {
384      return new ByteBufferIOEngine(capacity);
385    } else if (ioEngineName.startsWith("mmap:")) {
386      return new FileMmapEngine(ioEngineName.substring(5), capacity);
387    } else {
388      throw new IllegalArgumentException(
389          "Don't understand io engine name for cache- prefix with file:, files:, mmap: or offheap");
390    }
391  }
392
393  /**
394   * Cache the block with the specified name and buffer.
395   * @param cacheKey block's cache key
396   * @param buf block buffer
397   */
398  @Override
399  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
400    cacheBlock(cacheKey, buf, false);
401  }
402
403  /**
404   * Cache the block with the specified name and buffer.
405   * @param cacheKey block's cache key
406   * @param cachedItem block buffer
407   * @param inMemory if block is in-memory
408   */
409  @Override
410  public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory) {
411    cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache);
412  }
413
414  /**
415   * Cache the block to ramCache
416   * @param cacheKey block's cache key
417   * @param cachedItem block buffer
418   * @param inMemory if block is in-memory
419   * @param wait if true, blocking wait when queue is full
420   */
421  private void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
422      boolean wait) {
423    if (cacheEnabled) {
424      if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) {
425        if (BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, cachedItem)) {
426          cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait);
427        }
428      } else {
429        cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait);
430      }
431    }
432  }
433
434  private void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem,
435      boolean inMemory, boolean wait) {
436    if (!cacheEnabled) {
437      return;
438    }
439    LOG.trace("Caching key={}, item={}", cacheKey, cachedItem);
440    // Stuff the entry into the RAM cache so it can get drained to the persistent store
441    RAMQueueEntry re =
442        new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory);
443    /**
444     * Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same
445     * key in ramCache, the heap size of bucket cache need to update if replacing entry from
446     * ramCache. But WriterThread will also remove entry from ramCache and update heap size, if
447     * using ramCache.put(), It's possible that the removed entry in WriterThread is not the correct
448     * one, then the heap size will mess up (HBASE-20789)
449     */
450    if (ramCache.putIfAbsent(cacheKey, re) != null) {
451      return;
452    }
453    int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size();
454    BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum);
455    boolean successfulAddition = false;
456    if (wait) {
457      try {
458        successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS);
459      } catch (InterruptedException e) {
460        Thread.currentThread().interrupt();
461      }
462    } else {
463      successfulAddition = bq.offer(re);
464    }
465    if (!successfulAddition) {
466      ramCache.remove(cacheKey);
467      cacheStats.failInsert();
468    } else {
469      this.blockNumber.increment();
470      this.heapSize.add(cachedItem.heapSize());
471      blocksByHFile.add(cacheKey);
472    }
473  }
474
475  /**
476   * Get the buffer of the block with the specified key.
477   * @param key block's cache key
478   * @param caching true if the caller caches blocks on cache misses
479   * @param repeat Whether this is a repeat lookup for the same block
480   * @param updateCacheMetrics Whether we should update cache metrics or not
481   * @return buffer of specified cache key, or null if not in cache
482   */
483  @Override
484  public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
485      boolean updateCacheMetrics) {
486    if (!cacheEnabled) {
487      return null;
488    }
489    RAMQueueEntry re = ramCache.get(key);
490    if (re != null) {
491      if (updateCacheMetrics) {
492        cacheStats.hit(caching, key.isPrimary(), key.getBlockType());
493      }
494      re.access(accessCount.incrementAndGet());
495      return re.getData();
496    }
497    BucketEntry bucketEntry = backingMap.get(key);
498    if (bucketEntry != null) {
499      long start = System.nanoTime();
500      ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset());
501      try {
502        lock.readLock().lock();
503        // We can not read here even if backingMap does contain the given key because its offset
504        // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check
505        // existence here.
506        if (bucketEntry.equals(backingMap.get(key))) {
507          // TODO : change this area - should be removed after server cells and
508          // 12295 are available
509          int len = bucketEntry.getLength();
510          if (LOG.isTraceEnabled()) {
511            LOG.trace("Read offset=" + bucketEntry.offset() + ", len=" + len);
512          }
513          Cacheable cachedBlock = ioEngine.read(bucketEntry.offset(), len,
514              bucketEntry.deserializerReference(this.deserialiserMap));
515          long timeTaken = System.nanoTime() - start;
516          if (updateCacheMetrics) {
517            cacheStats.hit(caching, key.isPrimary(), key.getBlockType());
518            cacheStats.ioHit(timeTaken);
519          }
520          if (cachedBlock.getMemoryType() == MemoryType.SHARED) {
521            bucketEntry.incrementRefCountAndGet();
522          }
523          bucketEntry.access(accessCount.incrementAndGet());
524          if (this.ioErrorStartTime > 0) {
525            ioErrorStartTime = -1;
526          }
527          return cachedBlock;
528        }
529      } catch (IOException ioex) {
530        LOG.error("Failed reading block " + key + " from bucket cache", ioex);
531        checkIOErrorIsTolerated();
532      } finally {
533        lock.readLock().unlock();
534      }
535    }
536    if (!repeat && updateCacheMetrics) {
537      cacheStats.miss(caching, key.isPrimary(), key.getBlockType());
538    }
539    return null;
540  }
541
542  @VisibleForTesting
543  void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber) {
544    bucketAllocator.freeBlock(bucketEntry.offset());
545    realCacheSize.add(-1 * bucketEntry.getLength());
546    blocksByHFile.remove(cacheKey);
547    if (decrementBlockNumber) {
548      this.blockNumber.decrement();
549    }
550  }
551
552  @Override
553  public boolean evictBlock(BlockCacheKey cacheKey) {
554    return evictBlock(cacheKey, true);
555  }
556
557  // does not check for the ref count. Just tries to evict it if found in the
558  // bucket map
559  private boolean forceEvict(BlockCacheKey cacheKey) {
560    if (!cacheEnabled) {
561      return false;
562    }
563    RAMQueueEntry removedBlock = checkRamCache(cacheKey);
564    BucketEntry bucketEntry = backingMap.get(cacheKey);
565    if (bucketEntry == null) {
566      if (removedBlock != null) {
567        cacheStats.evicted(0, cacheKey.isPrimary());
568        return true;
569      } else {
570        return false;
571      }
572    }
573    ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset());
574    try {
575      lock.writeLock().lock();
576      if (backingMap.remove(cacheKey, bucketEntry)) {
577        blockEvicted(cacheKey, bucketEntry, removedBlock == null);
578      } else {
579        return false;
580      }
581    } finally {
582      lock.writeLock().unlock();
583    }
584    cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary());
585    return true;
586  }
587
588  private RAMQueueEntry checkRamCache(BlockCacheKey cacheKey) {
589    RAMQueueEntry removedBlock = ramCache.remove(cacheKey);
590    if (removedBlock != null) {
591      this.blockNumber.decrement();
592      this.heapSize.add(-1 * removedBlock.getData().heapSize());
593    }
594    return removedBlock;
595  }
596
597  public boolean evictBlock(BlockCacheKey cacheKey, boolean deletedBlock) {
598    if (!cacheEnabled) {
599      return false;
600    }
601    RAMQueueEntry removedBlock = checkRamCache(cacheKey);
602    BucketEntry bucketEntry = backingMap.get(cacheKey);
603    if (bucketEntry == null) {
604      if (removedBlock != null) {
605        cacheStats.evicted(0, cacheKey.isPrimary());
606        return true;
607      } else {
608        return false;
609      }
610    }
611    ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset());
612    try {
613      lock.writeLock().lock();
614      int refCount = bucketEntry.getRefCount();
615      if (refCount == 0) {
616        if (backingMap.remove(cacheKey, bucketEntry)) {
617          blockEvicted(cacheKey, bucketEntry, removedBlock == null);
618        } else {
619          return false;
620        }
621      } else {
622        if(!deletedBlock) {
623          if (LOG.isDebugEnabled()) {
624            LOG.debug("This block " + cacheKey + " is still referred by " + refCount
625                + " readers. Can not be freed now");
626          }
627          return false;
628        } else {
629          if (LOG.isDebugEnabled()) {
630            LOG.debug("This block " + cacheKey + " is still referred by " + refCount
631                + " readers. Can not be freed now. Hence will mark this"
632                + " for evicting at a later point");
633          }
634          bucketEntry.markForEvict();
635        }
636      }
637    } finally {
638      lock.writeLock().unlock();
639    }
640    cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary());
641    return true;
642  }
643
644  /*
645   * Statistics thread.  Periodically output cache statistics to the log.
646   */
647  private static class StatisticsThread extends Thread {
648    private final BucketCache bucketCache;
649
650    public StatisticsThread(BucketCache bucketCache) {
651      super("BucketCacheStatsThread");
652      setDaemon(true);
653      this.bucketCache = bucketCache;
654    }
655
656    @Override
657    public void run() {
658      bucketCache.logStats();
659    }
660  }
661
662  public void logStats() {
663    long totalSize = bucketAllocator.getTotalSize();
664    long usedSize = bucketAllocator.getUsedSize();
665    long freeSize = totalSize - usedSize;
666    long cacheSize = getRealCacheSize();
667    LOG.info("failedBlockAdditions=" + cacheStats.getFailedInserts() + ", " +
668        "totalSize=" + StringUtils.byteDesc(totalSize) + ", " +
669        "freeSize=" + StringUtils.byteDesc(freeSize) + ", " +
670        "usedSize=" + StringUtils.byteDesc(usedSize) +", " +
671        "cacheSize=" + StringUtils.byteDesc(cacheSize) +", " +
672        "accesses=" + cacheStats.getRequestCount() + ", " +
673        "hits=" + cacheStats.getHitCount() + ", " +
674        "IOhitsPerSecond=" + cacheStats.getIOHitsPerSecond() + ", " +
675        "IOTimePerHit=" + String.format("%.2f", cacheStats.getIOTimePerHit())+ ", " +
676        "hitRatio=" + (cacheStats.getHitCount() == 0 ? "0," :
677          (StringUtils.formatPercent(cacheStats.getHitRatio(), 2)+ ", ")) +
678        "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " +
679        "cachingHits=" + cacheStats.getHitCachingCount() + ", " +
680        "cachingHitsRatio=" +(cacheStats.getHitCachingCount() == 0 ? "0," :
681          (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2)+ ", ")) +
682        "evictions=" + cacheStats.getEvictionCount() + ", " +
683        "evicted=" + cacheStats.getEvictedCount() + ", " +
684        "evictedPerRun=" + cacheStats.evictedPerEviction());
685    cacheStats.reset();
686  }
687
688  public long getRealCacheSize() {
689    return this.realCacheSize.sum();
690  }
691
692  private long acceptableSize() {
693    return (long) Math.floor(bucketAllocator.getTotalSize() * acceptableFactor);
694  }
695
696  @VisibleForTesting
697  long getPartitionSize(float partitionFactor) {
698    return (long) Math.floor(bucketAllocator.getTotalSize() * partitionFactor * minFactor);
699  }
700
701  /**
702   * Return the count of bucketSizeinfos still need free space
703   */
704  private int bucketSizesAboveThresholdCount(float minFactor) {
705    BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics();
706    int fullCount = 0;
707    for (int i = 0; i < stats.length; i++) {
708      long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor));
709      freeGoal = Math.max(freeGoal, 1);
710      if (stats[i].freeCount() < freeGoal) {
711        fullCount++;
712      }
713    }
714    return fullCount;
715  }
716
717  /**
718   * This method will find the buckets that are minimally occupied
719   * and are not reference counted and will free them completely
720   * without any constraint on the access times of the elements,
721   * and as a process will completely free at most the number of buckets
722   * passed, sometimes it might not due to changing refCounts
723   *
724   * @param completelyFreeBucketsNeeded number of buckets to free
725   **/
726  private void freeEntireBuckets(int completelyFreeBucketsNeeded) {
727    if (completelyFreeBucketsNeeded != 0) {
728      // First we will build a set where the offsets are reference counted, usually
729      // this set is small around O(Handler Count) unless something else is wrong
730      Set<Integer> inUseBuckets = new HashSet<Integer>();
731      for (BucketEntry entry : backingMap.values()) {
732        if (entry.getRefCount() != 0) {
733          inUseBuckets.add(bucketAllocator.getBucketIndex(entry.offset()));
734        }
735      }
736
737      Set<Integer> candidateBuckets = bucketAllocator.getLeastFilledBuckets(
738          inUseBuckets, completelyFreeBucketsNeeded);
739      for (Map.Entry<BlockCacheKey, BucketEntry> entry : backingMap.entrySet()) {
740        if (candidateBuckets.contains(bucketAllocator
741            .getBucketIndex(entry.getValue().offset()))) {
742          evictBlock(entry.getKey(), false);
743        }
744      }
745    }
746  }
747
748  /**
749   * Free the space if the used size reaches acceptableSize() or one size block
750   * couldn't be allocated. When freeing the space, we use the LRU algorithm and
751   * ensure there must be some blocks evicted
752   * @param why Why we are being called
753   */
754  private void freeSpace(final String why) {
755    // Ensure only one freeSpace progress at a time
756    if (!freeSpaceLock.tryLock()) {
757      return;
758    }
759    try {
760      freeInProgress = true;
761      long bytesToFreeWithoutExtra = 0;
762      // Calculate free byte for each bucketSizeinfo
763      StringBuilder msgBuffer = LOG.isDebugEnabled()? new StringBuilder(): null;
764      BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics();
765      long[] bytesToFreeForBucket = new long[stats.length];
766      for (int i = 0; i < stats.length; i++) {
767        bytesToFreeForBucket[i] = 0;
768        long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor));
769        freeGoal = Math.max(freeGoal, 1);
770        if (stats[i].freeCount() < freeGoal) {
771          bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount());
772          bytesToFreeWithoutExtra += bytesToFreeForBucket[i];
773          if (msgBuffer != null) {
774            msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")="
775              + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", ");
776          }
777        }
778      }
779      if (msgBuffer != null) {
780        msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", ");
781      }
782
783      if (bytesToFreeWithoutExtra <= 0) {
784        return;
785      }
786      long currentSize = bucketAllocator.getUsedSize();
787      long totalSize = bucketAllocator.getTotalSize();
788      if (LOG.isDebugEnabled() && msgBuffer != null) {
789        LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString() +
790          " of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" +
791          StringUtils.byteDesc(realCacheSize.sum()) + ", total=" + StringUtils.byteDesc(totalSize));
792      }
793
794      long bytesToFreeWithExtra = (long) Math.floor(bytesToFreeWithoutExtra
795          * (1 + extraFreeFactor));
796
797      // Instantiate priority buckets
798      BucketEntryGroup bucketSingle = new BucketEntryGroup(bytesToFreeWithExtra,
799          blockSize, getPartitionSize(singleFactor));
800      BucketEntryGroup bucketMulti = new BucketEntryGroup(bytesToFreeWithExtra,
801          blockSize, getPartitionSize(multiFactor));
802      BucketEntryGroup bucketMemory = new BucketEntryGroup(bytesToFreeWithExtra,
803          blockSize, getPartitionSize(memoryFactor));
804
805      // Scan entire map putting bucket entry into appropriate bucket entry
806      // group
807      for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) {
808        switch (bucketEntryWithKey.getValue().getPriority()) {
809          case SINGLE: {
810            bucketSingle.add(bucketEntryWithKey);
811            break;
812          }
813          case MULTI: {
814            bucketMulti.add(bucketEntryWithKey);
815            break;
816          }
817          case MEMORY: {
818            bucketMemory.add(bucketEntryWithKey);
819            break;
820          }
821        }
822      }
823
824      PriorityQueue<BucketEntryGroup> bucketQueue = new PriorityQueue<>(3,
825          Comparator.comparingLong(BucketEntryGroup::overflow));
826
827      bucketQueue.add(bucketSingle);
828      bucketQueue.add(bucketMulti);
829      bucketQueue.add(bucketMemory);
830
831      int remainingBuckets = bucketQueue.size();
832      long bytesFreed = 0;
833
834      BucketEntryGroup bucketGroup;
835      while ((bucketGroup = bucketQueue.poll()) != null) {
836        long overflow = bucketGroup.overflow();
837        if (overflow > 0) {
838          long bucketBytesToFree = Math.min(overflow,
839              (bytesToFreeWithoutExtra - bytesFreed) / remainingBuckets);
840          bytesFreed += bucketGroup.free(bucketBytesToFree);
841        }
842        remainingBuckets--;
843      }
844
845      // Check and free if there are buckets that still need freeing of space
846      if (bucketSizesAboveThresholdCount(minFactor) > 0) {
847        bucketQueue.clear();
848        remainingBuckets = 3;
849
850        bucketQueue.add(bucketSingle);
851        bucketQueue.add(bucketMulti);
852        bucketQueue.add(bucketMemory);
853
854        while ((bucketGroup = bucketQueue.poll()) != null) {
855          long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets;
856          bytesFreed += bucketGroup.free(bucketBytesToFree);
857          remainingBuckets--;
858        }
859      }
860
861      // Even after the above free we might still need freeing because of the
862      // De-fragmentation of the buckets (also called Slab Calcification problem), i.e
863      // there might be some buckets where the occupancy is very sparse and thus are not
864      // yielding the free for the other bucket sizes, the fix for this to evict some
865      // of the buckets, we do this by evicting the buckets that are least fulled
866      freeEntireBuckets(DEFAULT_FREE_ENTIRE_BLOCK_FACTOR *
867          bucketSizesAboveThresholdCount(1.0f));
868
869      if (LOG.isDebugEnabled()) {
870        long single = bucketSingle.totalSize();
871        long multi = bucketMulti.totalSize();
872        long memory = bucketMemory.totalSize();
873        if (LOG.isDebugEnabled()) {
874          LOG.debug("Bucket cache free space completed; " + "freed="
875            + StringUtils.byteDesc(bytesFreed) + ", " + "total="
876            + StringUtils.byteDesc(totalSize) + ", " + "single="
877            + StringUtils.byteDesc(single) + ", " + "multi="
878            + StringUtils.byteDesc(multi) + ", " + "memory="
879            + StringUtils.byteDesc(memory));
880        }
881      }
882
883    } catch (Throwable t) {
884      LOG.warn("Failed freeing space", t);
885    } finally {
886      cacheStats.evict();
887      freeInProgress = false;
888      freeSpaceLock.unlock();
889    }
890  }
891
892  // This handles flushing the RAM cache to IOEngine.
893  @VisibleForTesting
894  class WriterThread extends HasThread {
895    private final BlockingQueue<RAMQueueEntry> inputQueue;
896    private volatile boolean writerEnabled = true;
897
898    WriterThread(BlockingQueue<RAMQueueEntry> queue) {
899      super("BucketCacheWriterThread");
900      this.inputQueue = queue;
901    }
902
903    // Used for test
904    @VisibleForTesting
905    void disableWriter() {
906      this.writerEnabled = false;
907    }
908
909    @Override
910    public void run() {
911      List<RAMQueueEntry> entries = new ArrayList<>();
912      try {
913        while (cacheEnabled && writerEnabled) {
914          try {
915            try {
916              // Blocks
917              entries = getRAMQueueEntries(inputQueue, entries);
918            } catch (InterruptedException ie) {
919              if (!cacheEnabled) break;
920            }
921            doDrain(entries);
922          } catch (Exception ioe) {
923            LOG.error("WriterThread encountered error", ioe);
924          }
925        }
926      } catch (Throwable t) {
927        LOG.warn("Failed doing drain", t);
928      }
929      LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled);
930    }
931
932    /**
933     * Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing
934     * cache with a new block for the same cache key. there's a corner case: one thread cache a
935     * block in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another
936     * new block with the same cache key do the same thing for the same cache key, so if not evict
937     * the previous bucket entry, then memory leak happen because the previous bucketEntry is gone
938     * but the bucketAllocator do not free its memory.
939     * @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey
940     *      cacheKey, Cacheable newBlock)
941     * @param key Block cache key
942     * @param bucketEntry Bucket entry to put into backingMap.
943     */
944    private void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) {
945      BucketEntry previousEntry = backingMap.put(key, bucketEntry);
946      if (previousEntry != null && previousEntry != bucketEntry) {
947        ReentrantReadWriteLock lock = offsetLock.getLock(previousEntry.offset());
948        lock.writeLock().lock();
949        try {
950          blockEvicted(key, previousEntry, false);
951        } finally {
952          lock.writeLock().unlock();
953        }
954      }
955    }
956
957    /**
958     * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap.
959     * Process all that are passed in even if failure being sure to remove from ramCache else we'll
960     * never undo the references and we'll OOME.
961     * @param entries Presumes list passed in here will be processed by this invocation only. No
962     *   interference expected.
963     * @throws InterruptedException
964     */
965    @VisibleForTesting
966    void doDrain(final List<RAMQueueEntry> entries) throws InterruptedException {
967      if (entries.isEmpty()) {
968        return;
969      }
970      // This method is a little hard to follow. We run through the passed in entries and for each
971      // successful add, we add a non-null BucketEntry to the below bucketEntries.  Later we must
972      // do cleanup making sure we've cleared ramCache of all entries regardless of whether we
973      // successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by
974      // filling ramCache.  We do the clean up by again running through the passed in entries
975      // doing extra work when we find a non-null bucketEntries corresponding entry.
976      final int size = entries.size();
977      BucketEntry[] bucketEntries = new BucketEntry[size];
978      // Index updated inside loop if success or if we can't succeed. We retry if cache is full
979      // when we go to add an entry by going around the loop again without upping the index.
980      int index = 0;
981      while (cacheEnabled && index < size) {
982        RAMQueueEntry re = null;
983        try {
984          re = entries.get(index);
985          if (re == null) {
986            LOG.warn("Couldn't get entry or changed on us; who else is messing with it?");
987            index++;
988            continue;
989          }
990          BucketEntry bucketEntry =
991            re.writeToCache(ioEngine, bucketAllocator, deserialiserMap, realCacheSize);
992          // Successfully added.  Up index and add bucketEntry. Clear io exceptions.
993          bucketEntries[index] = bucketEntry;
994          if (ioErrorStartTime > 0) {
995            ioErrorStartTime = -1;
996          }
997          index++;
998        } catch (BucketAllocatorException fle) {
999          LOG.warn("Failed allocation for " + (re == null ? "" : re.getKey()) + "; " + fle);
1000          // Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below.
1001          bucketEntries[index] = null;
1002          index++;
1003        } catch (CacheFullException cfe) {
1004          // Cache full when we tried to add. Try freeing space and then retrying (don't up index)
1005          if (!freeInProgress) {
1006            freeSpace("Full!");
1007          } else {
1008            Thread.sleep(50);
1009          }
1010        } catch (IOException ioex) {
1011          // Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem.
1012          LOG.error("Failed writing to bucket cache", ioex);
1013          checkIOErrorIsTolerated();
1014        }
1015      }
1016
1017      // Make sure data pages are written on media before we update maps.
1018      try {
1019        ioEngine.sync();
1020      } catch (IOException ioex) {
1021        LOG.error("Failed syncing IO engine", ioex);
1022        checkIOErrorIsTolerated();
1023        // Since we failed sync, free the blocks in bucket allocator
1024        for (int i = 0; i < entries.size(); ++i) {
1025          if (bucketEntries[i] != null) {
1026            bucketAllocator.freeBlock(bucketEntries[i].offset());
1027            bucketEntries[i] = null;
1028          }
1029        }
1030      }
1031
1032      // Now add to backingMap if successfully added to bucket cache.  Remove from ramCache if
1033      // success or error.
1034      for (int i = 0; i < size; ++i) {
1035        BlockCacheKey key = entries.get(i).getKey();
1036        // Only add if non-null entry.
1037        if (bucketEntries[i] != null) {
1038          putIntoBackingMap(key, bucketEntries[i]);
1039        }
1040        // Always remove from ramCache even if we failed adding it to the block cache above.
1041        RAMQueueEntry ramCacheEntry = ramCache.remove(key);
1042        if (ramCacheEntry != null) {
1043          heapSize.add(-1 * entries.get(i).getData().heapSize());
1044        } else if (bucketEntries[i] != null){
1045          // Block should have already been evicted. Remove it and free space.
1046          ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntries[i].offset());
1047          try {
1048            lock.writeLock().lock();
1049            if (backingMap.remove(key, bucketEntries[i])) {
1050              blockEvicted(key, bucketEntries[i], false);
1051            }
1052          } finally {
1053            lock.writeLock().unlock();
1054          }
1055        }
1056      }
1057
1058      long used = bucketAllocator.getUsedSize();
1059      if (used > acceptableSize()) {
1060        freeSpace("Used=" + used + " > acceptable=" + acceptableSize());
1061      }
1062      return;
1063    }
1064  }
1065
1066  /**
1067   * Blocks until elements available in {@code q} then tries to grab as many as possible
1068   * before returning.
1069   * @param receptacle Where to stash the elements taken from queue. We clear before we use it
1070   *     just in case.
1071   * @param q The queue to take from.
1072   * @return {@code receptacle} laden with elements taken from the queue or empty if none found.
1073   */
1074  @VisibleForTesting
1075  static List<RAMQueueEntry> getRAMQueueEntries(final BlockingQueue<RAMQueueEntry> q,
1076      final List<RAMQueueEntry> receptacle)
1077  throws InterruptedException {
1078    // Clear sets all entries to null and sets size to 0. We retain allocations. Presume it
1079    // ok even if list grew to accommodate thousands.
1080    receptacle.clear();
1081    receptacle.add(q.take());
1082    q.drainTo(receptacle);
1083    return receptacle;
1084  }
1085
1086  private void persistToFile() throws IOException {
1087    assert !cacheEnabled;
1088    FileOutputStream fos = null;
1089    ObjectOutputStream oos = null;
1090    try {
1091      if (!ioEngine.isPersistent()) {
1092        throw new IOException("Attempt to persist non-persistent cache mappings!");
1093      }
1094      fos = new FileOutputStream(persistencePath, false);
1095      oos = new ObjectOutputStream(fos);
1096      oos.writeLong(cacheCapacity);
1097      oos.writeUTF(ioEngine.getClass().getName());
1098      oos.writeUTF(backingMap.getClass().getName());
1099      oos.writeObject(deserialiserMap);
1100      oos.writeObject(backingMap);
1101    } finally {
1102      if (oos != null) oos.close();
1103      if (fos != null) fos.close();
1104    }
1105  }
1106
1107  @SuppressWarnings("unchecked")
1108  private void retrieveFromFile(int[] bucketSizes) throws IOException, BucketAllocatorException,
1109      ClassNotFoundException {
1110    File persistenceFile = new File(persistencePath);
1111    if (!persistenceFile.exists()) {
1112      return;
1113    }
1114    assert !cacheEnabled;
1115    FileInputStream fis = null;
1116    ObjectInputStream ois = null;
1117    try {
1118      if (!ioEngine.isPersistent())
1119        throw new IOException(
1120            "Attempt to restore non-persistent cache mappings!");
1121      fis = new FileInputStream(persistencePath);
1122      ois = new ObjectInputStream(fis);
1123      long capacitySize = ois.readLong();
1124      if (capacitySize != cacheCapacity)
1125        throw new IOException("Mismatched cache capacity:"
1126            + StringUtils.byteDesc(capacitySize) + ", expected: "
1127            + StringUtils.byteDesc(cacheCapacity));
1128      String ioclass = ois.readUTF();
1129      String mapclass = ois.readUTF();
1130      if (!ioEngine.getClass().getName().equals(ioclass))
1131        throw new IOException("Class name for IO engine mismatch: " + ioclass
1132            + ", expected:" + ioEngine.getClass().getName());
1133      if (!backingMap.getClass().getName().equals(mapclass))
1134        throw new IOException("Class name for cache map mismatch: " + mapclass
1135            + ", expected:" + backingMap.getClass().getName());
1136      UniqueIndexMap<Integer> deserMap = (UniqueIndexMap<Integer>) ois
1137          .readObject();
1138      ConcurrentHashMap<BlockCacheKey, BucketEntry> backingMapFromFile =
1139          (ConcurrentHashMap<BlockCacheKey, BucketEntry>) ois.readObject();
1140      BucketAllocator allocator = new BucketAllocator(cacheCapacity, bucketSizes,
1141          backingMapFromFile, realCacheSize);
1142      bucketAllocator = allocator;
1143      deserialiserMap = deserMap;
1144      backingMap = backingMapFromFile;
1145    } finally {
1146      if (ois != null) ois.close();
1147      if (fis != null) fis.close();
1148      if (!persistenceFile.delete()) {
1149        throw new IOException("Failed deleting persistence file "
1150            + persistenceFile.getAbsolutePath());
1151      }
1152    }
1153  }
1154
1155  /**
1156   * Check whether we tolerate IO error this time. If the duration of IOEngine
1157   * throwing errors exceeds ioErrorsDurationTimeTolerated, we will disable the
1158   * cache
1159   */
1160  private void checkIOErrorIsTolerated() {
1161    long now = EnvironmentEdgeManager.currentTime();
1162    if (this.ioErrorStartTime > 0) {
1163      if (cacheEnabled && (now - ioErrorStartTime) > this.ioErrorsTolerationDuration) {
1164        LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration +
1165          "ms, disabling cache, please check your IOEngine");
1166        disableCache();
1167      }
1168    } else {
1169      this.ioErrorStartTime = now;
1170    }
1171  }
1172
1173  /**
1174   * Used to shut down the cache -or- turn it off in the case of something broken.
1175   */
1176  private void disableCache() {
1177    if (!cacheEnabled) return;
1178    cacheEnabled = false;
1179    ioEngine.shutdown();
1180    this.scheduleThreadPool.shutdown();
1181    for (int i = 0; i < writerThreads.length; ++i) writerThreads[i].interrupt();
1182    this.ramCache.clear();
1183    if (!ioEngine.isPersistent() || persistencePath == null) {
1184      // If persistent ioengine and a path, we will serialize out the backingMap.
1185      this.backingMap.clear();
1186    }
1187  }
1188
1189  private void join() throws InterruptedException {
1190    for (int i = 0; i < writerThreads.length; ++i)
1191      writerThreads[i].join();
1192  }
1193
1194  @Override
1195  public void shutdown() {
1196    disableCache();
1197    LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent()
1198        + "; path to write=" + persistencePath);
1199    if (ioEngine.isPersistent() && persistencePath != null) {
1200      try {
1201        join();
1202        persistToFile();
1203      } catch (IOException ex) {
1204        LOG.error("Unable to persist data on exit: " + ex.toString(), ex);
1205      } catch (InterruptedException e) {
1206        LOG.warn("Failed to persist data on exit", e);
1207      }
1208    }
1209  }
1210
1211  @Override
1212  public CacheStats getStats() {
1213    return cacheStats;
1214  }
1215
1216  public BucketAllocator getAllocator() {
1217    return this.bucketAllocator;
1218  }
1219
1220  @Override
1221  public long heapSize() {
1222    return this.heapSize.sum();
1223  }
1224
1225  @Override
1226  public long size() {
1227    return this.realCacheSize.sum();
1228  }
1229
1230  @Override
1231  public long getCurrentDataSize() {
1232    return size();
1233  }
1234
1235  @Override
1236  public long getFreeSize() {
1237    return this.bucketAllocator.getFreeSize();
1238  }
1239
1240  @Override
1241  public long getBlockCount() {
1242    return this.blockNumber.sum();
1243  }
1244
1245  @Override
1246  public long getDataBlockCount() {
1247    return getBlockCount();
1248  }
1249
1250  @Override
1251  public long getCurrentSize() {
1252    return this.bucketAllocator.getUsedSize();
1253  }
1254
1255  /**
1256   * Evicts all blocks for a specific HFile.
1257   * <p>
1258   * This is used for evict-on-close to remove all blocks of a specific HFile.
1259   *
1260   * @return the number of blocks evicted
1261   */
1262  @Override
1263  public int evictBlocksByHfileName(String hfileName) {
1264    Set<BlockCacheKey> keySet = blocksByHFile.subSet(
1265        new BlockCacheKey(hfileName, Long.MIN_VALUE), true,
1266        new BlockCacheKey(hfileName, Long.MAX_VALUE), true);
1267
1268    int numEvicted = 0;
1269    for (BlockCacheKey key : keySet) {
1270      if (evictBlock(key)) {
1271          ++numEvicted;
1272      }
1273    }
1274
1275    return numEvicted;
1276  }
1277
1278  /**
1279   * Item in cache. We expect this to be where most memory goes. Java uses 8
1280   * bytes just for object headers; after this, we want to use as little as
1281   * possible - so we only use 8 bytes, but in order to do so we end up messing
1282   * around with all this Java casting stuff. Offset stored as 5 bytes that make
1283   * up the long. Doubt we'll see devices this big for ages. Offsets are divided
1284   * by 256. So 5 bytes gives us 256TB or so.
1285   */
1286  static class BucketEntry implements Serializable {
1287    private static final long serialVersionUID = -6741504807982257534L;
1288
1289    // access counter comparator, descending order
1290    static final Comparator<BucketEntry> COMPARATOR = new Comparator<BucketCache.BucketEntry>() {
1291
1292      @Override
1293      public int compare(BucketEntry o1, BucketEntry o2) {
1294        return Long.compare(o2.accessCounter, o1.accessCounter);
1295      }
1296    };
1297
1298    private int offsetBase;
1299    private int length;
1300    private byte offset1;
1301    byte deserialiserIndex;
1302    private volatile long accessCounter;
1303    private BlockPriority priority;
1304
1305    /**
1306     * Time this block was cached.  Presumes we are created just before we are added to the cache.
1307     */
1308    private final long cachedTime = System.nanoTime();
1309
1310    BucketEntry(long offset, int length, long accessCounter, boolean inMemory) {
1311      setOffset(offset);
1312      this.length = length;
1313      this.accessCounter = accessCounter;
1314      if (inMemory) {
1315        this.priority = BlockPriority.MEMORY;
1316      } else {
1317        this.priority = BlockPriority.SINGLE;
1318      }
1319    }
1320
1321    long offset() { // Java has no unsigned numbers
1322      long o = ((long) offsetBase) & 0xFFFFFFFFL; //This needs the L cast otherwise it will be sign extended as a negative number.
1323      o += (((long) (offset1)) & 0xFF) << 32; //The 0xFF here does not need the L cast because it is treated as a positive int.
1324      return o << 8;
1325    }
1326
1327    private void setOffset(long value) {
1328      assert (value & 0xFF) == 0;
1329      value >>= 8;
1330      offsetBase = (int) value;
1331      offset1 = (byte) (value >> 32);
1332    }
1333
1334    public int getLength() {
1335      return length;
1336    }
1337
1338    protected CacheableDeserializer<Cacheable> deserializerReference(
1339        UniqueIndexMap<Integer> deserialiserMap) {
1340      return CacheableDeserializerIdManager.getDeserializer(deserialiserMap
1341          .unmap(deserialiserIndex));
1342    }
1343
1344    protected void setDeserialiserReference(
1345        CacheableDeserializer<Cacheable> deserializer,
1346        UniqueIndexMap<Integer> deserialiserMap) {
1347      this.deserialiserIndex = ((byte) deserialiserMap.map(deserializer
1348          .getDeserialiserIdentifier()));
1349    }
1350
1351    /**
1352     * Block has been accessed. Update its local access counter.
1353     */
1354    public void access(long accessCounter) {
1355      this.accessCounter = accessCounter;
1356      if (this.priority == BlockPriority.SINGLE) {
1357        this.priority = BlockPriority.MULTI;
1358      }
1359    }
1360
1361    public BlockPriority getPriority() {
1362      return this.priority;
1363    }
1364
1365    public long getCachedTime() {
1366      return cachedTime;
1367    }
1368
1369    protected int getRefCount() {
1370      return 0;
1371    }
1372
1373    protected int incrementRefCountAndGet() {
1374      return 0;
1375    }
1376
1377    protected int decrementRefCountAndGet() {
1378      return 0;
1379    }
1380
1381    protected boolean isMarkedForEvict() {
1382      return false;
1383    }
1384
1385    protected void markForEvict() {
1386      // noop;
1387    }
1388  }
1389
1390  static class SharedMemoryBucketEntry extends BucketEntry {
1391    private static final long serialVersionUID = -2187147283772338481L;
1392
1393    // Set this when we were not able to forcefully evict the block
1394    private volatile boolean markedForEvict;
1395    private AtomicInteger refCount = new AtomicInteger(0);
1396
1397    SharedMemoryBucketEntry(long offset, int length, long accessCounter, boolean inMemory) {
1398      super(offset, length, accessCounter, inMemory);
1399    }
1400
1401    @Override
1402    protected int getRefCount() {
1403      return this.refCount.get();
1404    }
1405
1406    @Override
1407    protected int incrementRefCountAndGet() {
1408      return this.refCount.incrementAndGet();
1409    }
1410
1411    @Override
1412    protected int decrementRefCountAndGet() {
1413      return this.refCount.decrementAndGet();
1414    }
1415
1416    @Override
1417    protected boolean isMarkedForEvict() {
1418      return this.markedForEvict;
1419    }
1420
1421    @Override
1422    protected void markForEvict() {
1423      this.markedForEvict = true;
1424    }
1425  }
1426
1427  /**
1428   * Used to group bucket entries into priority buckets. There will be a
1429   * BucketEntryGroup for each priority (single, multi, memory). Once bucketed,
1430   * the eviction algorithm takes the appropriate number of elements out of each
1431   * according to configuration parameters and their relative sizes.
1432   */
1433  private class BucketEntryGroup {
1434
1435    private CachedEntryQueue queue;
1436    private long totalSize = 0;
1437    private long bucketSize;
1438
1439    public BucketEntryGroup(long bytesToFree, long blockSize, long bucketSize) {
1440      this.bucketSize = bucketSize;
1441      queue = new CachedEntryQueue(bytesToFree, blockSize);
1442      totalSize = 0;
1443    }
1444
1445    public void add(Map.Entry<BlockCacheKey, BucketEntry> block) {
1446      totalSize += block.getValue().getLength();
1447      queue.add(block);
1448    }
1449
1450    public long free(long toFree) {
1451      Map.Entry<BlockCacheKey, BucketEntry> entry;
1452      long freedBytes = 0;
1453      // TODO avoid a cycling siutation. We find no block which is not in use and so no way to free
1454      // What to do then? Caching attempt fail? Need some changes in cacheBlock API?
1455      while ((entry = queue.pollLast()) != null) {
1456        if (evictBlock(entry.getKey(), false)) {
1457          freedBytes += entry.getValue().getLength();
1458        }
1459        if (freedBytes >= toFree) {
1460          return freedBytes;
1461        }
1462      }
1463      return freedBytes;
1464    }
1465
1466    public long overflow() {
1467      return totalSize - bucketSize;
1468    }
1469
1470    public long totalSize() {
1471      return totalSize;
1472    }
1473  }
1474
1475  /**
1476   * Block Entry stored in the memory with key,data and so on
1477   */
1478  @VisibleForTesting
1479  static class RAMQueueEntry {
1480    private BlockCacheKey key;
1481    private Cacheable data;
1482    private long accessCounter;
1483    private boolean inMemory;
1484
1485    public RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter,
1486        boolean inMemory) {
1487      this.key = bck;
1488      this.data = data;
1489      this.accessCounter = accessCounter;
1490      this.inMemory = inMemory;
1491    }
1492
1493    public Cacheable getData() {
1494      return data;
1495    }
1496
1497    public BlockCacheKey getKey() {
1498      return key;
1499    }
1500
1501    public void access(long accessCounter) {
1502      this.accessCounter = accessCounter;
1503    }
1504
1505    private BucketEntry getBucketEntry(IOEngine ioEngine, long offset, int len) {
1506      if (ioEngine.usesSharedMemory()) {
1507        if (UnsafeAvailChecker.isAvailable()) {
1508          return new UnsafeSharedMemoryBucketEntry(offset, len, accessCounter, inMemory);
1509        } else {
1510          return new SharedMemoryBucketEntry(offset, len, accessCounter, inMemory);
1511        }
1512      } else {
1513        return new BucketEntry(offset, len, accessCounter, inMemory);
1514      }
1515    }
1516
1517    public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator bucketAllocator,
1518        final UniqueIndexMap<Integer> deserialiserMap, final LongAdder realCacheSize)
1519        throws IOException {
1520      int len = data.getSerializedLength();
1521      // This cacheable thing can't be serialized
1522      if (len == 0) {
1523        return null;
1524      }
1525      long offset = bucketAllocator.allocateBlock(len);
1526      boolean succ = false;
1527      BucketEntry bucketEntry;
1528      try {
1529        bucketEntry = getBucketEntry(ioEngine, offset, len);
1530        bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap);
1531        if (data instanceof HFileBlock) {
1532          // If an instance of HFileBlock, save on some allocations.
1533          HFileBlock block = (HFileBlock) data;
1534          ByteBuff sliceBuf = block.getBufferReadOnly();
1535          ByteBuffer metadata = block.getMetaData();
1536          ioEngine.write(sliceBuf, offset);
1537          ioEngine.write(metadata, offset + len - metadata.limit());
1538        } else {
1539          ByteBuffer bb = ByteBuffer.allocate(len);
1540          data.serialize(bb, true);
1541          ioEngine.write(bb, offset);
1542        }
1543        succ = true;
1544      } finally {
1545        if (!succ) {
1546          bucketAllocator.freeBlock(offset);
1547        }
1548      }
1549      realCacheSize.add(len);
1550      return bucketEntry;
1551    }
1552  }
1553
1554  /**
1555   * Only used in test
1556   * @throws InterruptedException
1557   */
1558  void stopWriterThreads() throws InterruptedException {
1559    for (WriterThread writerThread : writerThreads) {
1560      writerThread.disableWriter();
1561      writerThread.interrupt();
1562      writerThread.join();
1563    }
1564  }
1565
1566  @Override
1567  public Iterator<CachedBlock> iterator() {
1568    // Don't bother with ramcache since stuff is in here only a little while.
1569    final Iterator<Map.Entry<BlockCacheKey, BucketEntry>> i =
1570        this.backingMap.entrySet().iterator();
1571    return new Iterator<CachedBlock>() {
1572      private final long now = System.nanoTime();
1573
1574      @Override
1575      public boolean hasNext() {
1576        return i.hasNext();
1577      }
1578
1579      @Override
1580      public CachedBlock next() {
1581        final Map.Entry<BlockCacheKey, BucketEntry> e = i.next();
1582        return new CachedBlock() {
1583          @Override
1584          public String toString() {
1585            return BlockCacheUtil.toString(this, now);
1586          }
1587
1588          @Override
1589          public BlockPriority getBlockPriority() {
1590            return e.getValue().getPriority();
1591          }
1592
1593          @Override
1594          public BlockType getBlockType() {
1595            // Not held by BucketEntry.  Could add it if wanted on BucketEntry creation.
1596            return null;
1597          }
1598
1599          @Override
1600          public long getOffset() {
1601            return e.getKey().getOffset();
1602          }
1603
1604          @Override
1605          public long getSize() {
1606            return e.getValue().getLength();
1607          }
1608
1609          @Override
1610          public long getCachedTime() {
1611            return e.getValue().getCachedTime();
1612          }
1613
1614          @Override
1615          public String getFilename() {
1616            return e.getKey().getHfileName();
1617          }
1618
1619          @Override
1620          public int compareTo(CachedBlock other) {
1621            int diff = this.getFilename().compareTo(other.getFilename());
1622            if (diff != 0) return diff;
1623
1624            diff = Long.compare(this.getOffset(), other.getOffset());
1625            if (diff != 0) return diff;
1626            if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
1627              throw new IllegalStateException("" + this.getCachedTime() + ", " +
1628                other.getCachedTime());
1629            }
1630            return Long.compare(other.getCachedTime(), this.getCachedTime());
1631          }
1632
1633          @Override
1634          public int hashCode() {
1635            return e.getKey().hashCode();
1636          }
1637
1638          @Override
1639          public boolean equals(Object obj) {
1640            if (obj instanceof CachedBlock) {
1641              CachedBlock cb = (CachedBlock)obj;
1642              return compareTo(cb) == 0;
1643            } else {
1644              return false;
1645            }
1646          }
1647        };
1648      }
1649
1650      @Override
1651      public void remove() {
1652        throw new UnsupportedOperationException();
1653      }
1654    };
1655  }
1656
1657  @Override
1658  public BlockCache[] getBlockCaches() {
1659    return null;
1660  }
1661
1662  @Override
1663  public void returnBlock(BlockCacheKey cacheKey, Cacheable block) {
1664    if (block.getMemoryType() == MemoryType.SHARED) {
1665      BucketEntry bucketEntry = backingMap.get(cacheKey);
1666      if (bucketEntry != null) {
1667        int refCount = bucketEntry.decrementRefCountAndGet();
1668        if (refCount == 0 && bucketEntry.isMarkedForEvict()) {
1669          forceEvict(cacheKey);
1670        }
1671      }
1672    }
1673  }
1674
1675  @VisibleForTesting
1676  public int getRefCount(BlockCacheKey cacheKey) {
1677    BucketEntry bucketEntry = backingMap.get(cacheKey);
1678    if (bucketEntry != null) {
1679      return bucketEntry.getRefCount();
1680    }
1681    return 0;
1682  }
1683
1684  float getAcceptableFactor() {
1685    return acceptableFactor;
1686  }
1687
1688  float getMinFactor() {
1689    return minFactor;
1690  }
1691
1692  float getExtraFreeFactor() {
1693    return extraFreeFactor;
1694  }
1695
1696  float getSingleFactor() {
1697    return singleFactor;
1698  }
1699
1700  float getMultiFactor() {
1701    return multiFactor;
1702  }
1703
1704  float getMemoryFactor() {
1705    return memoryFactor;
1706  }
1707}