001/**
002 * Copyright The Apache Software Foundation
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021package org.apache.hadoop.hbase.io.hfile.bucket;
022
023import java.io.File;
024import java.io.FileInputStream;
025import java.io.FileNotFoundException;
026import java.io.FileOutputStream;
027import java.io.IOException;
028import java.io.ObjectInputStream;
029import java.io.ObjectOutputStream;
030import java.io.Serializable;
031import java.nio.ByteBuffer;
032import java.util.ArrayList;
033import java.util.Comparator;
034import java.util.HashSet;
035import java.util.Iterator;
036import java.util.List;
037import java.util.Map;
038import java.util.NavigableSet;
039import java.util.PriorityQueue;
040import java.util.Set;
041import java.util.concurrent.ArrayBlockingQueue;
042import java.util.concurrent.BlockingQueue;
043import java.util.concurrent.ConcurrentHashMap;
044import java.util.concurrent.ConcurrentMap;
045import java.util.concurrent.ConcurrentSkipListSet;
046import java.util.concurrent.Executors;
047import java.util.concurrent.ScheduledExecutorService;
048import java.util.concurrent.TimeUnit;
049import java.util.concurrent.atomic.AtomicInteger;
050import java.util.concurrent.atomic.AtomicLong;
051import java.util.concurrent.atomic.LongAdder;
052import java.util.concurrent.locks.Lock;
053import java.util.concurrent.locks.ReentrantLock;
054import java.util.concurrent.locks.ReentrantReadWriteLock;
055import org.apache.hadoop.conf.Configuration;
056import org.apache.hadoop.hbase.HBaseConfiguration;
057import org.apache.hadoop.hbase.io.HeapSize;
058import org.apache.hadoop.hbase.io.hfile.BlockCache;
059import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
060import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil;
061import org.apache.hadoop.hbase.io.hfile.BlockPriority;
062import org.apache.hadoop.hbase.io.hfile.BlockType;
063import org.apache.hadoop.hbase.io.hfile.CacheStats;
064import org.apache.hadoop.hbase.io.hfile.Cacheable;
065import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
066import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
067import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
068import org.apache.hadoop.hbase.io.hfile.CachedBlock;
069import org.apache.hadoop.hbase.io.hfile.HFileBlock;
070import org.apache.hadoop.hbase.nio.ByteBuff;
071import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
072import org.apache.hadoop.hbase.util.HasThread;
073import org.apache.hadoop.hbase.util.IdReadWriteLock;
074import org.apache.hadoop.hbase.util.IdReadWriteLock.ReferenceType;
075import org.apache.hadoop.hbase.util.UnsafeAvailChecker;
076import org.apache.hadoop.util.StringUtils;
077import org.apache.yetus.audience.InterfaceAudience;
078import org.slf4j.Logger;
079import org.slf4j.LoggerFactory;
080
081import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
082import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
083import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
084
085/**
086 * BucketCache uses {@link BucketAllocator} to allocate/free blocks, and uses
087 * BucketCache#ramCache and BucketCache#backingMap in order to
088 * determine if a given element is in the cache. The bucket cache can use on-heap or
089 * off-heap memory {@link ByteBufferIOEngine} or in a file {@link FileIOEngine} to
090 * store/read the block data.
091 *
092 * <p>Eviction is via a similar algorithm as used in
093 * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache}
094 *
095 * <p>BucketCache can be used as mainly a block cache (see
096 * {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with
097 * LruBlockCache to decrease CMS GC and heap fragmentation.
098 *
099 * <p>It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store
100 * blocks) to enlarge cache space via
101 * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache#setVictimCache}
102 */
103@InterfaceAudience.Private
104public class BucketCache implements BlockCache, HeapSize {
105  private static final Logger LOG = LoggerFactory.getLogger(BucketCache.class);
106
107  /** Priority buckets config */
108  static final String SINGLE_FACTOR_CONFIG_NAME = "hbase.bucketcache.single.factor";
109  static final String MULTI_FACTOR_CONFIG_NAME = "hbase.bucketcache.multi.factor";
110  static final String MEMORY_FACTOR_CONFIG_NAME = "hbase.bucketcache.memory.factor";
111  static final String EXTRA_FREE_FACTOR_CONFIG_NAME = "hbase.bucketcache.extrafreefactor";
112  static final String ACCEPT_FACTOR_CONFIG_NAME = "hbase.bucketcache.acceptfactor";
113  static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor";
114
115  /** Priority buckets */
116  @VisibleForTesting
117  static final float DEFAULT_SINGLE_FACTOR = 0.25f;
118  static final float DEFAULT_MULTI_FACTOR = 0.50f;
119  static final float DEFAULT_MEMORY_FACTOR = 0.25f;
120  static final float DEFAULT_MIN_FACTOR = 0.85f;
121
122  private static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f;
123  private static final float DEFAULT_ACCEPT_FACTOR = 0.95f;
124
125  // Number of blocks to clear for each of the bucket size that is full
126  private static final int DEFAULT_FREE_ENTIRE_BLOCK_FACTOR = 2;
127
128  /** Statistics thread */
129  private static final int statThreadPeriod = 5 * 60;
130
131  final static int DEFAULT_WRITER_THREADS = 3;
132  final static int DEFAULT_WRITER_QUEUE_ITEMS = 64;
133
134  // Store/read block data
135  transient final IOEngine ioEngine;
136
137  // Store the block in this map before writing it to cache
138  @VisibleForTesting
139  transient final ConcurrentMap<BlockCacheKey, RAMQueueEntry> ramCache;
140  // In this map, store the block's meta data like offset, length
141  @VisibleForTesting
142  transient ConcurrentMap<BlockCacheKey, BucketEntry> backingMap;
143
144  /**
145   * Flag if the cache is enabled or not... We shut it off if there are IO
146   * errors for some time, so that Bucket IO exceptions/errors don't bring down
147   * the HBase server.
148   */
149  private volatile boolean cacheEnabled;
150
151  /**
152   * A list of writer queues.  We have a queue per {@link WriterThread} we have running.
153   * In other words, the work adding blocks to the BucketCache is divided up amongst the
154   * running WriterThreads.  Its done by taking hash of the cache key modulo queue count.
155   * WriterThread when it runs takes whatever has been recently added and 'drains' the entries
156   * to the BucketCache.  It then updates the ramCache and backingMap accordingly.
157   */
158  @VisibleForTesting
159  transient final ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues = new ArrayList<>();
160  @VisibleForTesting
161  transient final WriterThread[] writerThreads;
162
163  /** Volatile boolean to track if free space is in process or not */
164  private volatile boolean freeInProgress = false;
165  private transient final Lock freeSpaceLock = new ReentrantLock();
166
167  private UniqueIndexMap<Integer> deserialiserMap = new UniqueIndexMap<>();
168
169  private final LongAdder realCacheSize = new LongAdder();
170  private final LongAdder heapSize = new LongAdder();
171  /** Current number of cached elements */
172  private final LongAdder blockNumber = new LongAdder();
173
174  /** Cache access count (sequential ID) */
175  private final AtomicLong accessCount = new AtomicLong();
176
177  private static final int DEFAULT_CACHE_WAIT_TIME = 50;
178  // Used in test now. If the flag is false and the cache speed is very fast,
179  // bucket cache will skip some blocks when caching. If the flag is true, we
180  // will wait blocks flushed to IOEngine for some time when caching
181  boolean wait_when_cache = false;
182
183  private final BucketCacheStats cacheStats = new BucketCacheStats();
184
185  private final String persistencePath;
186  private final long cacheCapacity;
187  /** Approximate block size */
188  private final long blockSize;
189
190  /** Duration of IO errors tolerated before we disable cache, 1 min as default */
191  private final int ioErrorsTolerationDuration;
192  // 1 min
193  public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000;
194
195  // Start time of first IO error when reading or writing IO Engine, it will be
196  // reset after a successful read/write.
197  private volatile long ioErrorStartTime = -1;
198
199  /**
200   * A ReentrantReadWriteLock to lock on a particular block identified by offset.
201   * The purpose of this is to avoid freeing the block which is being read.
202   * <p>
203   * Key set of offsets in BucketCache is limited so soft reference is the best choice here.
204   */
205  @VisibleForTesting
206  transient final IdReadWriteLock<Long> offsetLock = new IdReadWriteLock<>(ReferenceType.SOFT);
207
208  private final NavigableSet<BlockCacheKey> blocksByHFile =
209      new ConcurrentSkipListSet<>(new Comparator<BlockCacheKey>() {
210        @Override
211        public int compare(BlockCacheKey a, BlockCacheKey b) {
212          int nameComparison = a.getHfileName().compareTo(b.getHfileName());
213          if (nameComparison != 0) {
214            return nameComparison;
215          }
216
217          if (a.getOffset() == b.getOffset()) {
218            return 0;
219          } else if (a.getOffset() < b.getOffset()) {
220            return -1;
221          }
222          return 1;
223        }
224      });
225
226  /** Statistics thread schedule pool (for heavy debugging, could remove) */
227  private transient final ScheduledExecutorService scheduleThreadPool =
228    Executors.newScheduledThreadPool(1,
229      new ThreadFactoryBuilder().setNameFormat("BucketCacheStatsExecutor").setDaemon(true).build());
230
231  // Allocate or free space for the block
232  private transient BucketAllocator bucketAllocator;
233
234  /** Acceptable size of cache (no evictions if size < acceptable) */
235  private float acceptableFactor;
236
237  /** Minimum threshold of cache (when evicting, evict until size < min) */
238  private float minFactor;
239
240  /** Free this floating point factor of extra blocks when evicting. For example free the number of blocks requested * (1 + extraFreeFactor) */
241  private float extraFreeFactor;
242
243  /** Single access bucket size */
244  private float singleFactor;
245
246  /** Multiple access bucket size */
247  private float multiFactor;
248
249  /** In-memory bucket size */
250  private float memoryFactor;
251
252  public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
253      int writerThreadNum, int writerQLen, String persistencePath) throws FileNotFoundException,
254      IOException {
255    this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
256      persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, HBaseConfiguration.create());
257  }
258
259  public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
260                     int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration,
261                     Configuration conf)
262      throws FileNotFoundException, IOException {
263    this.ioEngine = getIOEngineFromName(ioEngineName, capacity, persistencePath);
264    this.writerThreads = new WriterThread[writerThreadNum];
265    long blockNumCapacity = capacity / blockSize;
266    if (blockNumCapacity >= Integer.MAX_VALUE) {
267      // Enough for about 32TB of cache!
268      throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now");
269    }
270
271    this.acceptableFactor = conf.getFloat(ACCEPT_FACTOR_CONFIG_NAME, DEFAULT_ACCEPT_FACTOR);
272    this.minFactor = conf.getFloat(MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR);
273    this.extraFreeFactor = conf.getFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, DEFAULT_EXTRA_FREE_FACTOR);
274    this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR);
275    this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR);
276    this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR);
277
278    sanityCheckConfigs();
279
280    LOG.info("Instantiating BucketCache with acceptableFactor: " + acceptableFactor + ", minFactor: " + minFactor +
281        ", extraFreeFactor: " + extraFreeFactor + ", singleFactor: " + singleFactor + ", multiFactor: " + multiFactor +
282        ", memoryFactor: " + memoryFactor);
283
284    this.cacheCapacity = capacity;
285    this.persistencePath = persistencePath;
286    this.blockSize = blockSize;
287    this.ioErrorsTolerationDuration = ioErrorsTolerationDuration;
288
289    bucketAllocator = new BucketAllocator(capacity, bucketSizes);
290    for (int i = 0; i < writerThreads.length; ++i) {
291      writerQueues.add(new ArrayBlockingQueue<>(writerQLen));
292    }
293
294    assert writerQueues.size() == writerThreads.length;
295    this.ramCache = new ConcurrentHashMap<>();
296
297    this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity);
298
299    if (ioEngine.isPersistent() && persistencePath != null) {
300      try {
301        retrieveFromFile(bucketSizes);
302      } catch (IOException ioex) {
303        LOG.error("Can't restore from file because of", ioex);
304      } catch (ClassNotFoundException cnfe) {
305        LOG.error("Can't restore from file in rebuild because can't deserialise",cnfe);
306        throw new RuntimeException(cnfe);
307      }
308    }
309    final String threadName = Thread.currentThread().getName();
310    this.cacheEnabled = true;
311    for (int i = 0; i < writerThreads.length; ++i) {
312      writerThreads[i] = new WriterThread(writerQueues.get(i));
313      writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i);
314      writerThreads[i].setDaemon(true);
315    }
316    startWriterThreads();
317
318    // Run the statistics thread periodically to print the cache statistics log
319    // TODO: Add means of turning this off.  Bit obnoxious running thread just to make a log
320    // every five minutes.
321    this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
322        statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
323    LOG.info("Started bucket cache; ioengine=" + ioEngineName +
324        ", capacity=" + StringUtils.byteDesc(capacity) +
325      ", blockSize=" + StringUtils.byteDesc(blockSize) + ", writerThreadNum=" +
326        writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath=" +
327      persistencePath + ", bucketAllocator=" + this.bucketAllocator.getClass().getName());
328  }
329
330  private void sanityCheckConfigs() {
331    Preconditions.checkArgument(acceptableFactor <= 1 && acceptableFactor >= 0, ACCEPT_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
332    Preconditions.checkArgument(minFactor <= 1 && minFactor >= 0, MIN_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
333    Preconditions.checkArgument(minFactor <= acceptableFactor, MIN_FACTOR_CONFIG_NAME + " must be <= " + ACCEPT_FACTOR_CONFIG_NAME);
334    Preconditions.checkArgument(extraFreeFactor >= 0, EXTRA_FREE_FACTOR_CONFIG_NAME + " must be greater than 0.0");
335    Preconditions.checkArgument(singleFactor <= 1 && singleFactor >= 0, SINGLE_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
336    Preconditions.checkArgument(multiFactor <= 1 && multiFactor >= 0, MULTI_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
337    Preconditions.checkArgument(memoryFactor <= 1 && memoryFactor >= 0, MEMORY_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0");
338    Preconditions.checkArgument((singleFactor + multiFactor + memoryFactor) == 1, SINGLE_FACTOR_CONFIG_NAME + ", " +
339        MULTI_FACTOR_CONFIG_NAME + ", and " + MEMORY_FACTOR_CONFIG_NAME + " segments must add up to 1.0");
340  }
341
342  /**
343   * Called by the constructor to start the writer threads. Used by tests that need to override
344   * starting the threads.
345   */
346  @VisibleForTesting
347  protected void startWriterThreads() {
348    for (WriterThread thread : writerThreads) {
349      thread.start();
350    }
351  }
352
353  @VisibleForTesting
354  boolean isCacheEnabled() {
355    return this.cacheEnabled;
356  }
357
358  @Override
359  public long getMaxSize() {
360    return this.cacheCapacity;
361  }
362
363  public String getIoEngine() {
364    return ioEngine.toString();
365  }
366
367  /**
368   * Get the IOEngine from the IO engine name
369   * @param ioEngineName
370   * @param capacity
371   * @param persistencePath
372   * @return the IOEngine
373   * @throws IOException
374   */
375  private IOEngine getIOEngineFromName(String ioEngineName, long capacity, String persistencePath)
376      throws IOException {
377    if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) {
378      // In order to make the usage simple, we only need the prefix 'files:' in
379      // document whether one or multiple file(s), but also support 'file:' for
380      // the compatibility
381      String[] filePaths = ioEngineName.substring(ioEngineName.indexOf(":") + 1)
382          .split(FileIOEngine.FILE_DELIMITER);
383      return new FileIOEngine(capacity, persistencePath != null, filePaths);
384    } else if (ioEngineName.startsWith("offheap")) {
385      return new ByteBufferIOEngine(capacity);
386    } else if (ioEngineName.startsWith("mmap:")) {
387      return new FileMmapEngine(ioEngineName.substring(5), capacity);
388    } else {
389      throw new IllegalArgumentException(
390          "Don't understand io engine name for cache- prefix with file:, files:, mmap: or offheap");
391    }
392  }
393
394  /**
395   * Cache the block with the specified name and buffer.
396   * @param cacheKey block's cache key
397   * @param buf block buffer
398   */
399  @Override
400  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
401    cacheBlock(cacheKey, buf, false);
402  }
403
404  /**
405   * Cache the block with the specified name and buffer.
406   * @param cacheKey block's cache key
407   * @param cachedItem block buffer
408   * @param inMemory if block is in-memory
409   */
410  @Override
411  public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory) {
412    cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache);
413  }
414
415  /**
416   * Cache the block to ramCache
417   * @param cacheKey block's cache key
418   * @param cachedItem block buffer
419   * @param inMemory if block is in-memory
420   * @param wait if true, blocking wait when queue is full
421   */
422  private void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
423      boolean wait) {
424    if (cacheEnabled) {
425      if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) {
426        if (BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, cachedItem)) {
427          cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait);
428        }
429      } else {
430        cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait);
431      }
432    }
433  }
434
435  private void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem,
436      boolean inMemory, boolean wait) {
437    if (!cacheEnabled) {
438      return;
439    }
440    LOG.trace("Caching key={}, item={}", cacheKey, cachedItem);
441    // Stuff the entry into the RAM cache so it can get drained to the persistent store
442    RAMQueueEntry re =
443        new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory);
444    /**
445     * Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same
446     * key in ramCache, the heap size of bucket cache need to update if replacing entry from
447     * ramCache. But WriterThread will also remove entry from ramCache and update heap size, if
448     * using ramCache.put(), It's possible that the removed entry in WriterThread is not the correct
449     * one, then the heap size will mess up (HBASE-20789)
450     */
451    if (ramCache.putIfAbsent(cacheKey, re) != null) {
452      return;
453    }
454    int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size();
455    BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum);
456    boolean successfulAddition = false;
457    if (wait) {
458      try {
459        successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS);
460      } catch (InterruptedException e) {
461        Thread.currentThread().interrupt();
462      }
463    } else {
464      successfulAddition = bq.offer(re);
465    }
466    if (!successfulAddition) {
467      ramCache.remove(cacheKey);
468      cacheStats.failInsert();
469    } else {
470      this.blockNumber.increment();
471      this.heapSize.add(cachedItem.heapSize());
472      blocksByHFile.add(cacheKey);
473    }
474  }
475
476  /**
477   * Get the buffer of the block with the specified key.
478   * @param key block's cache key
479   * @param caching true if the caller caches blocks on cache misses
480   * @param repeat Whether this is a repeat lookup for the same block
481   * @param updateCacheMetrics Whether we should update cache metrics or not
482   * @return buffer of specified cache key, or null if not in cache
483   */
484  @Override
485  public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
486      boolean updateCacheMetrics) {
487    if (!cacheEnabled) {
488      return null;
489    }
490    RAMQueueEntry re = ramCache.get(key);
491    if (re != null) {
492      if (updateCacheMetrics) {
493        cacheStats.hit(caching, key.isPrimary(), key.getBlockType());
494      }
495      re.access(accessCount.incrementAndGet());
496      return re.getData();
497    }
498    BucketEntry bucketEntry = backingMap.get(key);
499    if (bucketEntry != null) {
500      long start = System.nanoTime();
501      ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset());
502      try {
503        lock.readLock().lock();
504        // We can not read here even if backingMap does contain the given key because its offset
505        // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check
506        // existence here.
507        if (bucketEntry.equals(backingMap.get(key))) {
508          // TODO : change this area - should be removed after server cells and
509          // 12295 are available
510          int len = bucketEntry.getLength();
511          if (LOG.isTraceEnabled()) {
512            LOG.trace("Read offset=" + bucketEntry.offset() + ", len=" + len);
513          }
514          Cacheable cachedBlock = ioEngine.read(bucketEntry.offset(), len,
515              bucketEntry.deserializerReference(this.deserialiserMap));
516          long timeTaken = System.nanoTime() - start;
517          if (updateCacheMetrics) {
518            cacheStats.hit(caching, key.isPrimary(), key.getBlockType());
519            cacheStats.ioHit(timeTaken);
520          }
521          if (cachedBlock.getMemoryType() == MemoryType.SHARED) {
522            bucketEntry.incrementRefCountAndGet();
523          }
524          bucketEntry.access(accessCount.incrementAndGet());
525          if (this.ioErrorStartTime > 0) {
526            ioErrorStartTime = -1;
527          }
528          return cachedBlock;
529        }
530      } catch (IOException ioex) {
531        LOG.error("Failed reading block " + key + " from bucket cache", ioex);
532        checkIOErrorIsTolerated();
533      } finally {
534        lock.readLock().unlock();
535      }
536    }
537    if (!repeat && updateCacheMetrics) {
538      cacheStats.miss(caching, key.isPrimary(), key.getBlockType());
539    }
540    return null;
541  }
542
543  @VisibleForTesting
544  void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber) {
545    bucketAllocator.freeBlock(bucketEntry.offset());
546    realCacheSize.add(-1 * bucketEntry.getLength());
547    blocksByHFile.remove(cacheKey);
548    if (decrementBlockNumber) {
549      this.blockNumber.decrement();
550    }
551  }
552
553  @Override
554  public boolean evictBlock(BlockCacheKey cacheKey) {
555    return evictBlock(cacheKey, true);
556  }
557
558  private RAMQueueEntry checkRamCache(BlockCacheKey cacheKey) {
559    RAMQueueEntry removedBlock = ramCache.remove(cacheKey);
560    if (removedBlock != null) {
561      this.blockNumber.decrement();
562      this.heapSize.add(-1 * removedBlock.getData().heapSize());
563    }
564    return removedBlock;
565  }
566
567  public boolean evictBlock(BlockCacheKey cacheKey, boolean deletedBlock) {
568    if (!cacheEnabled) {
569      return false;
570    }
571    RAMQueueEntry removedBlock = checkRamCache(cacheKey);
572    BucketEntry bucketEntry = backingMap.get(cacheKey);
573    if (bucketEntry == null) {
574      if (removedBlock != null) {
575        cacheStats.evicted(0, cacheKey.isPrimary());
576        return true;
577      } else {
578        return false;
579      }
580    }
581    ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset());
582    try {
583      lock.writeLock().lock();
584      int refCount = bucketEntry.getRefCount();
585      if (refCount == 0) {
586        if (backingMap.remove(cacheKey, bucketEntry)) {
587          blockEvicted(cacheKey, bucketEntry, removedBlock == null);
588        } else {
589          return false;
590        }
591      } else {
592        if(!deletedBlock) {
593          if (LOG.isDebugEnabled()) {
594            LOG.debug("This block " + cacheKey + " is still referred by " + refCount
595                + " readers. Can not be freed now");
596          }
597          return false;
598        } else {
599          if (LOG.isDebugEnabled()) {
600            LOG.debug("This block " + cacheKey + " is still referred by " + refCount
601                + " readers. Can not be freed now. Hence will mark this"
602                + " for evicting at a later point");
603          }
604          bucketEntry.markForEvict();
605        }
606      }
607    } finally {
608      lock.writeLock().unlock();
609    }
610    cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary());
611    return true;
612  }
613
614  /*
615   * Statistics thread.  Periodically output cache statistics to the log.
616   */
617  private static class StatisticsThread extends Thread {
618    private final BucketCache bucketCache;
619
620    public StatisticsThread(BucketCache bucketCache) {
621      super("BucketCacheStatsThread");
622      setDaemon(true);
623      this.bucketCache = bucketCache;
624    }
625
626    @Override
627    public void run() {
628      bucketCache.logStats();
629    }
630  }
631
632  public void logStats() {
633    long totalSize = bucketAllocator.getTotalSize();
634    long usedSize = bucketAllocator.getUsedSize();
635    long freeSize = totalSize - usedSize;
636    long cacheSize = getRealCacheSize();
637    LOG.info("failedBlockAdditions=" + cacheStats.getFailedInserts() + ", " +
638        "totalSize=" + StringUtils.byteDesc(totalSize) + ", " +
639        "freeSize=" + StringUtils.byteDesc(freeSize) + ", " +
640        "usedSize=" + StringUtils.byteDesc(usedSize) +", " +
641        "cacheSize=" + StringUtils.byteDesc(cacheSize) +", " +
642        "accesses=" + cacheStats.getRequestCount() + ", " +
643        "hits=" + cacheStats.getHitCount() + ", " +
644        "IOhitsPerSecond=" + cacheStats.getIOHitsPerSecond() + ", " +
645        "IOTimePerHit=" + String.format("%.2f", cacheStats.getIOTimePerHit())+ ", " +
646        "hitRatio=" + (cacheStats.getHitCount() == 0 ? "0," :
647          (StringUtils.formatPercent(cacheStats.getHitRatio(), 2)+ ", ")) +
648        "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " +
649        "cachingHits=" + cacheStats.getHitCachingCount() + ", " +
650        "cachingHitsRatio=" +(cacheStats.getHitCachingCount() == 0 ? "0," :
651          (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2)+ ", ")) +
652        "evictions=" + cacheStats.getEvictionCount() + ", " +
653        "evicted=" + cacheStats.getEvictedCount() + ", " +
654        "evictedPerRun=" + cacheStats.evictedPerEviction());
655    cacheStats.reset();
656  }
657
658  public long getRealCacheSize() {
659    return this.realCacheSize.sum();
660  }
661
662  private long acceptableSize() {
663    return (long) Math.floor(bucketAllocator.getTotalSize() * acceptableFactor);
664  }
665
666  @VisibleForTesting
667  long getPartitionSize(float partitionFactor) {
668    return (long) Math.floor(bucketAllocator.getTotalSize() * partitionFactor * minFactor);
669  }
670
671  /**
672   * Return the count of bucketSizeinfos still need free space
673   */
674  private int bucketSizesAboveThresholdCount(float minFactor) {
675    BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics();
676    int fullCount = 0;
677    for (int i = 0; i < stats.length; i++) {
678      long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor));
679      freeGoal = Math.max(freeGoal, 1);
680      if (stats[i].freeCount() < freeGoal) {
681        fullCount++;
682      }
683    }
684    return fullCount;
685  }
686
687  /**
688   * This method will find the buckets that are minimally occupied
689   * and are not reference counted and will free them completely
690   * without any constraint on the access times of the elements,
691   * and as a process will completely free at most the number of buckets
692   * passed, sometimes it might not due to changing refCounts
693   *
694   * @param completelyFreeBucketsNeeded number of buckets to free
695   **/
696  private void freeEntireBuckets(int completelyFreeBucketsNeeded) {
697    if (completelyFreeBucketsNeeded != 0) {
698      // First we will build a set where the offsets are reference counted, usually
699      // this set is small around O(Handler Count) unless something else is wrong
700      Set<Integer> inUseBuckets = new HashSet<Integer>();
701      for (BucketEntry entry : backingMap.values()) {
702        if (entry.getRefCount() != 0) {
703          inUseBuckets.add(bucketAllocator.getBucketIndex(entry.offset()));
704        }
705      }
706
707      Set<Integer> candidateBuckets = bucketAllocator.getLeastFilledBuckets(
708          inUseBuckets, completelyFreeBucketsNeeded);
709      for (Map.Entry<BlockCacheKey, BucketEntry> entry : backingMap.entrySet()) {
710        if (candidateBuckets.contains(bucketAllocator
711            .getBucketIndex(entry.getValue().offset()))) {
712          evictBlock(entry.getKey(), false);
713        }
714      }
715    }
716  }
717
718  /**
719   * Free the space if the used size reaches acceptableSize() or one size block
720   * couldn't be allocated. When freeing the space, we use the LRU algorithm and
721   * ensure there must be some blocks evicted
722   * @param why Why we are being called
723   */
724  private void freeSpace(final String why) {
725    // Ensure only one freeSpace progress at a time
726    if (!freeSpaceLock.tryLock()) {
727      return;
728    }
729    try {
730      freeInProgress = true;
731      long bytesToFreeWithoutExtra = 0;
732      // Calculate free byte for each bucketSizeinfo
733      StringBuilder msgBuffer = LOG.isDebugEnabled()? new StringBuilder(): null;
734      BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics();
735      long[] bytesToFreeForBucket = new long[stats.length];
736      for (int i = 0; i < stats.length; i++) {
737        bytesToFreeForBucket[i] = 0;
738        long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor));
739        freeGoal = Math.max(freeGoal, 1);
740        if (stats[i].freeCount() < freeGoal) {
741          bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount());
742          bytesToFreeWithoutExtra += bytesToFreeForBucket[i];
743          if (msgBuffer != null) {
744            msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")="
745              + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", ");
746          }
747        }
748      }
749      if (msgBuffer != null) {
750        msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", ");
751      }
752
753      if (bytesToFreeWithoutExtra <= 0) {
754        return;
755      }
756      long currentSize = bucketAllocator.getUsedSize();
757      long totalSize = bucketAllocator.getTotalSize();
758      if (LOG.isDebugEnabled() && msgBuffer != null) {
759        LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString() +
760          " of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" +
761          StringUtils.byteDesc(realCacheSize.sum()) + ", total=" + StringUtils.byteDesc(totalSize));
762      }
763
764      long bytesToFreeWithExtra = (long) Math.floor(bytesToFreeWithoutExtra
765          * (1 + extraFreeFactor));
766
767      // Instantiate priority buckets
768      BucketEntryGroup bucketSingle = new BucketEntryGroup(bytesToFreeWithExtra,
769          blockSize, getPartitionSize(singleFactor));
770      BucketEntryGroup bucketMulti = new BucketEntryGroup(bytesToFreeWithExtra,
771          blockSize, getPartitionSize(multiFactor));
772      BucketEntryGroup bucketMemory = new BucketEntryGroup(bytesToFreeWithExtra,
773          blockSize, getPartitionSize(memoryFactor));
774
775      // Scan entire map putting bucket entry into appropriate bucket entry
776      // group
777      for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) {
778        switch (bucketEntryWithKey.getValue().getPriority()) {
779          case SINGLE: {
780            bucketSingle.add(bucketEntryWithKey);
781            break;
782          }
783          case MULTI: {
784            bucketMulti.add(bucketEntryWithKey);
785            break;
786          }
787          case MEMORY: {
788            bucketMemory.add(bucketEntryWithKey);
789            break;
790          }
791        }
792      }
793
794      PriorityQueue<BucketEntryGroup> bucketQueue = new PriorityQueue<>(3,
795          Comparator.comparingLong(BucketEntryGroup::overflow));
796
797      bucketQueue.add(bucketSingle);
798      bucketQueue.add(bucketMulti);
799      bucketQueue.add(bucketMemory);
800
801      int remainingBuckets = bucketQueue.size();
802      long bytesFreed = 0;
803
804      BucketEntryGroup bucketGroup;
805      while ((bucketGroup = bucketQueue.poll()) != null) {
806        long overflow = bucketGroup.overflow();
807        if (overflow > 0) {
808          long bucketBytesToFree = Math.min(overflow,
809              (bytesToFreeWithoutExtra - bytesFreed) / remainingBuckets);
810          bytesFreed += bucketGroup.free(bucketBytesToFree);
811        }
812        remainingBuckets--;
813      }
814
815      // Check and free if there are buckets that still need freeing of space
816      if (bucketSizesAboveThresholdCount(minFactor) > 0) {
817        bucketQueue.clear();
818        remainingBuckets = 3;
819
820        bucketQueue.add(bucketSingle);
821        bucketQueue.add(bucketMulti);
822        bucketQueue.add(bucketMemory);
823
824        while ((bucketGroup = bucketQueue.poll()) != null) {
825          long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets;
826          bytesFreed += bucketGroup.free(bucketBytesToFree);
827          remainingBuckets--;
828        }
829      }
830
831      // Even after the above free we might still need freeing because of the
832      // De-fragmentation of the buckets (also called Slab Calcification problem), i.e
833      // there might be some buckets where the occupancy is very sparse and thus are not
834      // yielding the free for the other bucket sizes, the fix for this to evict some
835      // of the buckets, we do this by evicting the buckets that are least fulled
836      freeEntireBuckets(DEFAULT_FREE_ENTIRE_BLOCK_FACTOR *
837          bucketSizesAboveThresholdCount(1.0f));
838
839      if (LOG.isDebugEnabled()) {
840        long single = bucketSingle.totalSize();
841        long multi = bucketMulti.totalSize();
842        long memory = bucketMemory.totalSize();
843        if (LOG.isDebugEnabled()) {
844          LOG.debug("Bucket cache free space completed; " + "freed="
845            + StringUtils.byteDesc(bytesFreed) + ", " + "total="
846            + StringUtils.byteDesc(totalSize) + ", " + "single="
847            + StringUtils.byteDesc(single) + ", " + "multi="
848            + StringUtils.byteDesc(multi) + ", " + "memory="
849            + StringUtils.byteDesc(memory));
850        }
851      }
852
853    } catch (Throwable t) {
854      LOG.warn("Failed freeing space", t);
855    } finally {
856      cacheStats.evict();
857      freeInProgress = false;
858      freeSpaceLock.unlock();
859    }
860  }
861
862  // This handles flushing the RAM cache to IOEngine.
863  @VisibleForTesting
864  class WriterThread extends HasThread {
865    private final BlockingQueue<RAMQueueEntry> inputQueue;
866    private volatile boolean writerEnabled = true;
867
868    WriterThread(BlockingQueue<RAMQueueEntry> queue) {
869      super("BucketCacheWriterThread");
870      this.inputQueue = queue;
871    }
872
873    // Used for test
874    @VisibleForTesting
875    void disableWriter() {
876      this.writerEnabled = false;
877    }
878
879    @Override
880    public void run() {
881      List<RAMQueueEntry> entries = new ArrayList<>();
882      try {
883        while (cacheEnabled && writerEnabled) {
884          try {
885            try {
886              // Blocks
887              entries = getRAMQueueEntries(inputQueue, entries);
888            } catch (InterruptedException ie) {
889              if (!cacheEnabled) break;
890            }
891            doDrain(entries);
892          } catch (Exception ioe) {
893            LOG.error("WriterThread encountered error", ioe);
894          }
895        }
896      } catch (Throwable t) {
897        LOG.warn("Failed doing drain", t);
898      }
899      LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled);
900    }
901
902    /**
903     * Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing
904     * cache with a new block for the same cache key. there's a corner case: one thread cache a
905     * block in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another
906     * new block with the same cache key do the same thing for the same cache key, so if not evict
907     * the previous bucket entry, then memory leak happen because the previous bucketEntry is gone
908     * but the bucketAllocator do not free its memory.
909     * @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey
910     *      cacheKey, Cacheable newBlock)
911     * @param key Block cache key
912     * @param bucketEntry Bucket entry to put into backingMap.
913     */
914    private void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) {
915      BucketEntry previousEntry = backingMap.put(key, bucketEntry);
916      if (previousEntry != null && previousEntry != bucketEntry) {
917        ReentrantReadWriteLock lock = offsetLock.getLock(previousEntry.offset());
918        lock.writeLock().lock();
919        try {
920          blockEvicted(key, previousEntry, false);
921        } finally {
922          lock.writeLock().unlock();
923        }
924      }
925    }
926
927    /**
928     * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap.
929     * Process all that are passed in even if failure being sure to remove from ramCache else we'll
930     * never undo the references and we'll OOME.
931     * @param entries Presumes list passed in here will be processed by this invocation only. No
932     *   interference expected.
933     * @throws InterruptedException
934     */
935    @VisibleForTesting
936    void doDrain(final List<RAMQueueEntry> entries) throws InterruptedException {
937      if (entries.isEmpty()) {
938        return;
939      }
940      // This method is a little hard to follow. We run through the passed in entries and for each
941      // successful add, we add a non-null BucketEntry to the below bucketEntries.  Later we must
942      // do cleanup making sure we've cleared ramCache of all entries regardless of whether we
943      // successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by
944      // filling ramCache.  We do the clean up by again running through the passed in entries
945      // doing extra work when we find a non-null bucketEntries corresponding entry.
946      final int size = entries.size();
947      BucketEntry[] bucketEntries = new BucketEntry[size];
948      // Index updated inside loop if success or if we can't succeed. We retry if cache is full
949      // when we go to add an entry by going around the loop again without upping the index.
950      int index = 0;
951      while (cacheEnabled && index < size) {
952        RAMQueueEntry re = null;
953        try {
954          re = entries.get(index);
955          if (re == null) {
956            LOG.warn("Couldn't get entry or changed on us; who else is messing with it?");
957            index++;
958            continue;
959          }
960          BucketEntry bucketEntry =
961            re.writeToCache(ioEngine, bucketAllocator, deserialiserMap, realCacheSize);
962          // Successfully added.  Up index and add bucketEntry. Clear io exceptions.
963          bucketEntries[index] = bucketEntry;
964          if (ioErrorStartTime > 0) {
965            ioErrorStartTime = -1;
966          }
967          index++;
968        } catch (BucketAllocatorException fle) {
969          LOG.warn("Failed allocation for " + (re == null ? "" : re.getKey()) + "; " + fle);
970          // Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below.
971          bucketEntries[index] = null;
972          index++;
973        } catch (CacheFullException cfe) {
974          // Cache full when we tried to add. Try freeing space and then retrying (don't up index)
975          if (!freeInProgress) {
976            freeSpace("Full!");
977          } else {
978            Thread.sleep(50);
979          }
980        } catch (IOException ioex) {
981          // Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem.
982          LOG.error("Failed writing to bucket cache", ioex);
983          checkIOErrorIsTolerated();
984        }
985      }
986
987      // Make sure data pages are written on media before we update maps.
988      try {
989        ioEngine.sync();
990      } catch (IOException ioex) {
991        LOG.error("Failed syncing IO engine", ioex);
992        checkIOErrorIsTolerated();
993        // Since we failed sync, free the blocks in bucket allocator
994        for (int i = 0; i < entries.size(); ++i) {
995          if (bucketEntries[i] != null) {
996            bucketAllocator.freeBlock(bucketEntries[i].offset());
997            bucketEntries[i] = null;
998          }
999        }
1000      }
1001
1002      // Now add to backingMap if successfully added to bucket cache.  Remove from ramCache if
1003      // success or error.
1004      for (int i = 0; i < size; ++i) {
1005        BlockCacheKey key = entries.get(i).getKey();
1006        // Only add if non-null entry.
1007        if (bucketEntries[i] != null) {
1008          putIntoBackingMap(key, bucketEntries[i]);
1009        }
1010        // Always remove from ramCache even if we failed adding it to the block cache above.
1011        RAMQueueEntry ramCacheEntry = ramCache.remove(key);
1012        if (ramCacheEntry != null) {
1013          heapSize.add(-1 * entries.get(i).getData().heapSize());
1014        } else if (bucketEntries[i] != null){
1015          // Block should have already been evicted. Remove it and free space.
1016          ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntries[i].offset());
1017          try {
1018            lock.writeLock().lock();
1019            int refCount = bucketEntries[i].getRefCount();
1020            if (refCount == 0) {
1021              if (backingMap.remove(key, bucketEntries[i])) {
1022                blockEvicted(key, bucketEntries[i], false);
1023              } else {
1024                bucketEntries[i].markForEvict();
1025              }
1026            } else {
1027              bucketEntries[i].markForEvict();
1028            }
1029          } finally {
1030            lock.writeLock().unlock();
1031          }
1032        }
1033      }
1034
1035      long used = bucketAllocator.getUsedSize();
1036      if (used > acceptableSize()) {
1037        freeSpace("Used=" + used + " > acceptable=" + acceptableSize());
1038      }
1039      return;
1040    }
1041  }
1042
1043  /**
1044   * Blocks until elements available in {@code q} then tries to grab as many as possible
1045   * before returning.
1046   * @param receptacle Where to stash the elements taken from queue. We clear before we use it
1047   *     just in case.
1048   * @param q The queue to take from.
1049   * @return {@code receptacle} laden with elements taken from the queue or empty if none found.
1050   */
1051  @VisibleForTesting
1052  static List<RAMQueueEntry> getRAMQueueEntries(final BlockingQueue<RAMQueueEntry> q,
1053      final List<RAMQueueEntry> receptacle)
1054  throws InterruptedException {
1055    // Clear sets all entries to null and sets size to 0. We retain allocations. Presume it
1056    // ok even if list grew to accommodate thousands.
1057    receptacle.clear();
1058    receptacle.add(q.take());
1059    q.drainTo(receptacle);
1060    return receptacle;
1061  }
1062
1063  private void persistToFile() throws IOException {
1064    assert !cacheEnabled;
1065    FileOutputStream fos = null;
1066    ObjectOutputStream oos = null;
1067    try {
1068      if (!ioEngine.isPersistent()) {
1069        throw new IOException("Attempt to persist non-persistent cache mappings!");
1070      }
1071      fos = new FileOutputStream(persistencePath, false);
1072      oos = new ObjectOutputStream(fos);
1073      oos.writeLong(cacheCapacity);
1074      oos.writeUTF(ioEngine.getClass().getName());
1075      oos.writeUTF(backingMap.getClass().getName());
1076      oos.writeObject(deserialiserMap);
1077      oos.writeObject(backingMap);
1078    } finally {
1079      if (oos != null) oos.close();
1080      if (fos != null) fos.close();
1081    }
1082  }
1083
1084  @SuppressWarnings("unchecked")
1085  private void retrieveFromFile(int[] bucketSizes) throws IOException, BucketAllocatorException,
1086      ClassNotFoundException {
1087    File persistenceFile = new File(persistencePath);
1088    if (!persistenceFile.exists()) {
1089      return;
1090    }
1091    assert !cacheEnabled;
1092    FileInputStream fis = null;
1093    ObjectInputStream ois = null;
1094    try {
1095      if (!ioEngine.isPersistent())
1096        throw new IOException(
1097            "Attempt to restore non-persistent cache mappings!");
1098      fis = new FileInputStream(persistencePath);
1099      ois = new ObjectInputStream(fis);
1100      long capacitySize = ois.readLong();
1101      if (capacitySize != cacheCapacity)
1102        throw new IOException("Mismatched cache capacity:"
1103            + StringUtils.byteDesc(capacitySize) + ", expected: "
1104            + StringUtils.byteDesc(cacheCapacity));
1105      String ioclass = ois.readUTF();
1106      String mapclass = ois.readUTF();
1107      if (!ioEngine.getClass().getName().equals(ioclass))
1108        throw new IOException("Class name for IO engine mismatch: " + ioclass
1109            + ", expected:" + ioEngine.getClass().getName());
1110      if (!backingMap.getClass().getName().equals(mapclass))
1111        throw new IOException("Class name for cache map mismatch: " + mapclass
1112            + ", expected:" + backingMap.getClass().getName());
1113      UniqueIndexMap<Integer> deserMap = (UniqueIndexMap<Integer>) ois
1114          .readObject();
1115      ConcurrentHashMap<BlockCacheKey, BucketEntry> backingMapFromFile =
1116          (ConcurrentHashMap<BlockCacheKey, BucketEntry>) ois.readObject();
1117      BucketAllocator allocator = new BucketAllocator(cacheCapacity, bucketSizes,
1118          backingMapFromFile, realCacheSize);
1119      bucketAllocator = allocator;
1120      deserialiserMap = deserMap;
1121      backingMap = backingMapFromFile;
1122    } finally {
1123      if (ois != null) ois.close();
1124      if (fis != null) fis.close();
1125      if (!persistenceFile.delete()) {
1126        throw new IOException("Failed deleting persistence file "
1127            + persistenceFile.getAbsolutePath());
1128      }
1129    }
1130  }
1131
1132  /**
1133   * Check whether we tolerate IO error this time. If the duration of IOEngine
1134   * throwing errors exceeds ioErrorsDurationTimeTolerated, we will disable the
1135   * cache
1136   */
1137  private void checkIOErrorIsTolerated() {
1138    long now = EnvironmentEdgeManager.currentTime();
1139    if (this.ioErrorStartTime > 0) {
1140      if (cacheEnabled && (now - ioErrorStartTime) > this.ioErrorsTolerationDuration) {
1141        LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration +
1142          "ms, disabling cache, please check your IOEngine");
1143        disableCache();
1144      }
1145    } else {
1146      this.ioErrorStartTime = now;
1147    }
1148  }
1149
1150  /**
1151   * Used to shut down the cache -or- turn it off in the case of something broken.
1152   */
1153  private void disableCache() {
1154    if (!cacheEnabled) return;
1155    cacheEnabled = false;
1156    ioEngine.shutdown();
1157    this.scheduleThreadPool.shutdown();
1158    for (int i = 0; i < writerThreads.length; ++i) writerThreads[i].interrupt();
1159    this.ramCache.clear();
1160    if (!ioEngine.isPersistent() || persistencePath == null) {
1161      // If persistent ioengine and a path, we will serialize out the backingMap.
1162      this.backingMap.clear();
1163    }
1164  }
1165
1166  private void join() throws InterruptedException {
1167    for (int i = 0; i < writerThreads.length; ++i)
1168      writerThreads[i].join();
1169  }
1170
1171  @Override
1172  public void shutdown() {
1173    disableCache();
1174    LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent()
1175        + "; path to write=" + persistencePath);
1176    if (ioEngine.isPersistent() && persistencePath != null) {
1177      try {
1178        join();
1179        persistToFile();
1180      } catch (IOException ex) {
1181        LOG.error("Unable to persist data on exit: " + ex.toString(), ex);
1182      } catch (InterruptedException e) {
1183        LOG.warn("Failed to persist data on exit", e);
1184      }
1185    }
1186  }
1187
1188  @Override
1189  public CacheStats getStats() {
1190    return cacheStats;
1191  }
1192
1193  public BucketAllocator getAllocator() {
1194    return this.bucketAllocator;
1195  }
1196
1197  @Override
1198  public long heapSize() {
1199    return this.heapSize.sum();
1200  }
1201
1202  @Override
1203  public long size() {
1204    return this.realCacheSize.sum();
1205  }
1206
1207  @Override
1208  public long getCurrentDataSize() {
1209    return size();
1210  }
1211
1212  @Override
1213  public long getFreeSize() {
1214    return this.bucketAllocator.getFreeSize();
1215  }
1216
1217  @Override
1218  public long getBlockCount() {
1219    return this.blockNumber.sum();
1220  }
1221
1222  @Override
1223  public long getDataBlockCount() {
1224    return getBlockCount();
1225  }
1226
1227  @Override
1228  public long getCurrentSize() {
1229    return this.bucketAllocator.getUsedSize();
1230  }
1231
1232  /**
1233   * Evicts all blocks for a specific HFile.
1234   * <p>
1235   * This is used for evict-on-close to remove all blocks of a specific HFile.
1236   *
1237   * @return the number of blocks evicted
1238   */
1239  @Override
1240  public int evictBlocksByHfileName(String hfileName) {
1241    Set<BlockCacheKey> keySet = blocksByHFile.subSet(
1242        new BlockCacheKey(hfileName, Long.MIN_VALUE), true,
1243        new BlockCacheKey(hfileName, Long.MAX_VALUE), true);
1244
1245    int numEvicted = 0;
1246    for (BlockCacheKey key : keySet) {
1247      if (evictBlock(key)) {
1248          ++numEvicted;
1249      }
1250    }
1251
1252    return numEvicted;
1253  }
1254
1255  /**
1256   * Item in cache. We expect this to be where most memory goes. Java uses 8
1257   * bytes just for object headers; after this, we want to use as little as
1258   * possible - so we only use 8 bytes, but in order to do so we end up messing
1259   * around with all this Java casting stuff. Offset stored as 5 bytes that make
1260   * up the long. Doubt we'll see devices this big for ages. Offsets are divided
1261   * by 256. So 5 bytes gives us 256TB or so.
1262   */
1263  static class BucketEntry implements Serializable {
1264    private static final long serialVersionUID = -6741504807982257534L;
1265
1266    // access counter comparator, descending order
1267    static final Comparator<BucketEntry> COMPARATOR = new Comparator<BucketCache.BucketEntry>() {
1268
1269      @Override
1270      public int compare(BucketEntry o1, BucketEntry o2) {
1271        return Long.compare(o2.accessCounter, o1.accessCounter);
1272      }
1273    };
1274
1275    private int offsetBase;
1276    private int length;
1277    private byte offset1;
1278    byte deserialiserIndex;
1279    private volatile long accessCounter;
1280    private BlockPriority priority;
1281
1282    /**
1283     * Time this block was cached.  Presumes we are created just before we are added to the cache.
1284     */
1285    private final long cachedTime = System.nanoTime();
1286
1287    BucketEntry(long offset, int length, long accessCounter, boolean inMemory) {
1288      setOffset(offset);
1289      this.length = length;
1290      this.accessCounter = accessCounter;
1291      if (inMemory) {
1292        this.priority = BlockPriority.MEMORY;
1293      } else {
1294        this.priority = BlockPriority.SINGLE;
1295      }
1296    }
1297
1298    long offset() { // Java has no unsigned numbers
1299      long o = ((long) offsetBase) & 0xFFFFFFFFL; //This needs the L cast otherwise it will be sign extended as a negative number.
1300      o += (((long) (offset1)) & 0xFF) << 32; //The 0xFF here does not need the L cast because it is treated as a positive int.
1301      return o << 8;
1302    }
1303
1304    private void setOffset(long value) {
1305      assert (value & 0xFF) == 0;
1306      value >>= 8;
1307      offsetBase = (int) value;
1308      offset1 = (byte) (value >> 32);
1309    }
1310
1311    public int getLength() {
1312      return length;
1313    }
1314
1315    protected CacheableDeserializer<Cacheable> deserializerReference(
1316        UniqueIndexMap<Integer> deserialiserMap) {
1317      return CacheableDeserializerIdManager.getDeserializer(deserialiserMap
1318          .unmap(deserialiserIndex));
1319    }
1320
1321    protected void setDeserialiserReference(
1322        CacheableDeserializer<Cacheable> deserializer,
1323        UniqueIndexMap<Integer> deserialiserMap) {
1324      this.deserialiserIndex = ((byte) deserialiserMap.map(deserializer
1325          .getDeserialiserIdentifier()));
1326    }
1327
1328    /**
1329     * Block has been accessed. Update its local access counter.
1330     */
1331    public void access(long accessCounter) {
1332      this.accessCounter = accessCounter;
1333      if (this.priority == BlockPriority.SINGLE) {
1334        this.priority = BlockPriority.MULTI;
1335      }
1336    }
1337
1338    public BlockPriority getPriority() {
1339      return this.priority;
1340    }
1341
1342    public long getCachedTime() {
1343      return cachedTime;
1344    }
1345
1346    protected int getRefCount() {
1347      return 0;
1348    }
1349
1350    protected int incrementRefCountAndGet() {
1351      return 0;
1352    }
1353
1354    protected int decrementRefCountAndGet() {
1355      return 0;
1356    }
1357
1358    protected boolean isMarkedForEvict() {
1359      return false;
1360    }
1361
1362    protected void markForEvict() {
1363      // noop;
1364    }
1365  }
1366
1367  static class SharedMemoryBucketEntry extends BucketEntry {
1368    private static final long serialVersionUID = -2187147283772338481L;
1369
1370    // Set this when we were not able to forcefully evict the block
1371    private volatile boolean markedForEvict;
1372    private AtomicInteger refCount = new AtomicInteger(0);
1373
1374    SharedMemoryBucketEntry(long offset, int length, long accessCounter, boolean inMemory) {
1375      super(offset, length, accessCounter, inMemory);
1376    }
1377
1378    @Override
1379    protected int getRefCount() {
1380      return this.refCount.get();
1381    }
1382
1383    @Override
1384    protected int incrementRefCountAndGet() {
1385      return this.refCount.incrementAndGet();
1386    }
1387
1388    @Override
1389    protected int decrementRefCountAndGet() {
1390      return this.refCount.decrementAndGet();
1391    }
1392
1393    @Override
1394    protected boolean isMarkedForEvict() {
1395      return this.markedForEvict;
1396    }
1397
1398    @Override
1399    protected void markForEvict() {
1400      this.markedForEvict = true;
1401    }
1402  }
1403
1404  /**
1405   * Used to group bucket entries into priority buckets. There will be a
1406   * BucketEntryGroup for each priority (single, multi, memory). Once bucketed,
1407   * the eviction algorithm takes the appropriate number of elements out of each
1408   * according to configuration parameters and their relative sizes.
1409   */
1410  private class BucketEntryGroup {
1411
1412    private CachedEntryQueue queue;
1413    private long totalSize = 0;
1414    private long bucketSize;
1415
1416    public BucketEntryGroup(long bytesToFree, long blockSize, long bucketSize) {
1417      this.bucketSize = bucketSize;
1418      queue = new CachedEntryQueue(bytesToFree, blockSize);
1419      totalSize = 0;
1420    }
1421
1422    public void add(Map.Entry<BlockCacheKey, BucketEntry> block) {
1423      totalSize += block.getValue().getLength();
1424      queue.add(block);
1425    }
1426
1427    public long free(long toFree) {
1428      Map.Entry<BlockCacheKey, BucketEntry> entry;
1429      long freedBytes = 0;
1430      // TODO avoid a cycling siutation. We find no block which is not in use and so no way to free
1431      // What to do then? Caching attempt fail? Need some changes in cacheBlock API?
1432      while ((entry = queue.pollLast()) != null) {
1433        if (evictBlock(entry.getKey(), false)) {
1434          freedBytes += entry.getValue().getLength();
1435        }
1436        if (freedBytes >= toFree) {
1437          return freedBytes;
1438        }
1439      }
1440      return freedBytes;
1441    }
1442
1443    public long overflow() {
1444      return totalSize - bucketSize;
1445    }
1446
1447    public long totalSize() {
1448      return totalSize;
1449    }
1450  }
1451
1452  /**
1453   * Block Entry stored in the memory with key,data and so on
1454   */
1455  @VisibleForTesting
1456  static class RAMQueueEntry {
1457    private BlockCacheKey key;
1458    private Cacheable data;
1459    private long accessCounter;
1460    private boolean inMemory;
1461
1462    public RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter,
1463        boolean inMemory) {
1464      this.key = bck;
1465      this.data = data;
1466      this.accessCounter = accessCounter;
1467      this.inMemory = inMemory;
1468    }
1469
1470    public Cacheable getData() {
1471      return data;
1472    }
1473
1474    public BlockCacheKey getKey() {
1475      return key;
1476    }
1477
1478    public void access(long accessCounter) {
1479      this.accessCounter = accessCounter;
1480    }
1481
1482    private BucketEntry getBucketEntry(IOEngine ioEngine, long offset, int len) {
1483      if (ioEngine.usesSharedMemory()) {
1484        if (UnsafeAvailChecker.isAvailable()) {
1485          return new UnsafeSharedMemoryBucketEntry(offset, len, accessCounter, inMemory);
1486        } else {
1487          return new SharedMemoryBucketEntry(offset, len, accessCounter, inMemory);
1488        }
1489      } else {
1490        return new BucketEntry(offset, len, accessCounter, inMemory);
1491      }
1492    }
1493
1494    public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator bucketAllocator,
1495        final UniqueIndexMap<Integer> deserialiserMap, final LongAdder realCacheSize)
1496        throws IOException {
1497      int len = data.getSerializedLength();
1498      // This cacheable thing can't be serialized
1499      if (len == 0) {
1500        return null;
1501      }
1502      long offset = bucketAllocator.allocateBlock(len);
1503      boolean succ = false;
1504      BucketEntry bucketEntry;
1505      try {
1506        bucketEntry = getBucketEntry(ioEngine, offset, len);
1507        bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap);
1508        if (data instanceof HFileBlock) {
1509          // If an instance of HFileBlock, save on some allocations.
1510          HFileBlock block = (HFileBlock) data;
1511          ByteBuff sliceBuf = block.getBufferReadOnly();
1512          ByteBuffer metadata = block.getMetaData();
1513          ioEngine.write(sliceBuf, offset);
1514          ioEngine.write(metadata, offset + len - metadata.limit());
1515        } else {
1516          ByteBuffer bb = ByteBuffer.allocate(len);
1517          data.serialize(bb, true);
1518          ioEngine.write(bb, offset);
1519        }
1520        succ = true;
1521      } finally {
1522        if (!succ) {
1523          bucketAllocator.freeBlock(offset);
1524        }
1525      }
1526      realCacheSize.add(len);
1527      return bucketEntry;
1528    }
1529  }
1530
1531  /**
1532   * Only used in test
1533   * @throws InterruptedException
1534   */
1535  void stopWriterThreads() throws InterruptedException {
1536    for (WriterThread writerThread : writerThreads) {
1537      writerThread.disableWriter();
1538      writerThread.interrupt();
1539      writerThread.join();
1540    }
1541  }
1542
1543  @Override
1544  public Iterator<CachedBlock> iterator() {
1545    // Don't bother with ramcache since stuff is in here only a little while.
1546    final Iterator<Map.Entry<BlockCacheKey, BucketEntry>> i =
1547        this.backingMap.entrySet().iterator();
1548    return new Iterator<CachedBlock>() {
1549      private final long now = System.nanoTime();
1550
1551      @Override
1552      public boolean hasNext() {
1553        return i.hasNext();
1554      }
1555
1556      @Override
1557      public CachedBlock next() {
1558        final Map.Entry<BlockCacheKey, BucketEntry> e = i.next();
1559        return new CachedBlock() {
1560          @Override
1561          public String toString() {
1562            return BlockCacheUtil.toString(this, now);
1563          }
1564
1565          @Override
1566          public BlockPriority getBlockPriority() {
1567            return e.getValue().getPriority();
1568          }
1569
1570          @Override
1571          public BlockType getBlockType() {
1572            // Not held by BucketEntry.  Could add it if wanted on BucketEntry creation.
1573            return null;
1574          }
1575
1576          @Override
1577          public long getOffset() {
1578            return e.getKey().getOffset();
1579          }
1580
1581          @Override
1582          public long getSize() {
1583            return e.getValue().getLength();
1584          }
1585
1586          @Override
1587          public long getCachedTime() {
1588            return e.getValue().getCachedTime();
1589          }
1590
1591          @Override
1592          public String getFilename() {
1593            return e.getKey().getHfileName();
1594          }
1595
1596          @Override
1597          public int compareTo(CachedBlock other) {
1598            int diff = this.getFilename().compareTo(other.getFilename());
1599            if (diff != 0) return diff;
1600
1601            diff = Long.compare(this.getOffset(), other.getOffset());
1602            if (diff != 0) return diff;
1603            if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
1604              throw new IllegalStateException("" + this.getCachedTime() + ", " +
1605                other.getCachedTime());
1606            }
1607            return Long.compare(other.getCachedTime(), this.getCachedTime());
1608          }
1609
1610          @Override
1611          public int hashCode() {
1612            return e.getKey().hashCode();
1613          }
1614
1615          @Override
1616          public boolean equals(Object obj) {
1617            if (obj instanceof CachedBlock) {
1618              CachedBlock cb = (CachedBlock)obj;
1619              return compareTo(cb) == 0;
1620            } else {
1621              return false;
1622            }
1623          }
1624        };
1625      }
1626
1627      @Override
1628      public void remove() {
1629        throw new UnsupportedOperationException();
1630      }
1631    };
1632  }
1633
1634  @Override
1635  public BlockCache[] getBlockCaches() {
1636    return null;
1637  }
1638
1639  @Override
1640  public void returnBlock(BlockCacheKey cacheKey, Cacheable block) {
1641    if (block.getMemoryType() == MemoryType.SHARED) {
1642      BucketEntry bucketEntry = backingMap.get(cacheKey);
1643      if (bucketEntry != null) {
1644        int refCount = bucketEntry.decrementRefCountAndGet();
1645        if (refCount == 0 && bucketEntry.isMarkedForEvict()) {
1646          evictBlock(cacheKey);
1647        }
1648      }
1649    }
1650  }
1651
1652  @VisibleForTesting
1653  public int getRefCount(BlockCacheKey cacheKey) {
1654    BucketEntry bucketEntry = backingMap.get(cacheKey);
1655    if (bucketEntry != null) {
1656      return bucketEntry.getRefCount();
1657    }
1658    return 0;
1659  }
1660
1661  float getAcceptableFactor() {
1662    return acceptableFactor;
1663  }
1664
1665  float getMinFactor() {
1666    return minFactor;
1667  }
1668
1669  float getExtraFreeFactor() {
1670    return extraFreeFactor;
1671  }
1672
1673  float getSingleFactor() {
1674    return singleFactor;
1675  }
1676
1677  float getMultiFactor() {
1678    return multiFactor;
1679  }
1680
1681  float getMemoryFactor() {
1682    return memoryFactor;
1683  }
1684}