View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15  
16   * distributed under the License is distributed on an "AS IS" BASIS,
17   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18   * See the License for the specific language governing permissions and
19   * limitations under the License.
20   */
21  package org.apache.hadoop.hbase.io.hfile.bucket;
22  
23  import java.io.File;
24  import java.io.FileInputStream;
25  import java.io.FileNotFoundException;
26  import java.io.FileOutputStream;
27  import java.io.IOException;
28  import java.io.ObjectInputStream;
29  import java.io.ObjectOutputStream;
30  import java.io.Serializable;
31  import java.nio.ByteBuffer;
32  import java.util.ArrayList;
33  import java.util.Comparator;
34  import java.util.Iterator;
35  import java.util.List;
36  import java.util.Map;
37  import java.util.PriorityQueue;
38  import java.util.Set;
39  import java.util.concurrent.ArrayBlockingQueue;
40  import java.util.concurrent.BlockingQueue;
41  import java.util.concurrent.ConcurrentHashMap;
42  import java.util.concurrent.ConcurrentMap;
43  import java.util.concurrent.Executors;
44  import java.util.concurrent.ScheduledExecutorService;
45  import java.util.concurrent.TimeUnit;
46  import java.util.concurrent.atomic.AtomicInteger;
47  import java.util.concurrent.atomic.AtomicLong;
48  import java.util.concurrent.locks.Lock;
49  import java.util.concurrent.locks.ReentrantLock;
50  
51  import org.apache.commons.logging.Log;
52  import org.apache.commons.logging.LogFactory;
53  import org.apache.hadoop.hbase.classification.InterfaceAudience;
54  import org.apache.hadoop.hbase.io.HeapSize;
55  import org.apache.hadoop.hbase.io.hfile.BlockCache;
56  import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
57  import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil;
58  import org.apache.hadoop.hbase.io.hfile.BlockPriority;
59  import org.apache.hadoop.hbase.io.hfile.BlockType;
60  import org.apache.hadoop.hbase.io.hfile.CacheStats;
61  import org.apache.hadoop.hbase.io.hfile.Cacheable;
62  import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
63  import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
64  import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
65  import org.apache.hadoop.hbase.io.hfile.CachedBlock;
66  import org.apache.hadoop.hbase.io.hfile.HFileBlock;
67  import org.apache.hadoop.hbase.nio.ByteBuff;
68  import org.apache.hadoop.hbase.util.ConcurrentIndex;
69  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
70  import org.apache.hadoop.hbase.util.HasThread;
71  import org.apache.hadoop.hbase.util.IdLock;
72  import org.apache.hadoop.hbase.util.Pair;
73  import org.apache.hadoop.util.StringUtils;
74  
75  import com.google.common.annotations.VisibleForTesting;
76  import com.google.common.collect.ImmutableList;
77  import com.google.common.util.concurrent.ThreadFactoryBuilder;
78  
79  /**
80   * BucketCache uses {@link BucketAllocator} to allocate/free blocks, and uses
81   * {@link BucketCache#ramCache} and {@link BucketCache#backingMap} in order to
82   * determine if a given element is in the cache. The bucket cache can use on-heap or
83   * off-heap memory {@link ByteBufferIOEngine} or in a file {@link FileIOEngine} to
84   * store/read the block data.
85   *
86   * <p>Eviction is via a similar algorithm as used in
87   * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache}
88   *
89   * <p>BucketCache can be used as mainly a block cache (see
90   * {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with 
91   * LruBlockCache to decrease CMS GC and heap fragmentation.
92   *
93   * <p>It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store
94   * blocks) to enlarge cache space via
95   * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache#setVictimCache}
96   */
97  @InterfaceAudience.Private
98  public class BucketCache implements BlockCache, HeapSize {
99    private static final Log LOG = LogFactory.getLog(BucketCache.class);
100 
101   /** Priority buckets */
102   private static final float DEFAULT_SINGLE_FACTOR = 0.25f;
103   private static final float DEFAULT_MULTI_FACTOR = 0.50f;
104   private static final float DEFAULT_MEMORY_FACTOR = 0.25f;
105   private static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f;
106 
107   private static final float DEFAULT_ACCEPT_FACTOR = 0.95f;
108   private static final float DEFAULT_MIN_FACTOR = 0.85f;
109 
110   /** Statistics thread */
111   private static final int statThreadPeriod = 5 * 60;
112 
113   final static int DEFAULT_WRITER_THREADS = 3;
114   final static int DEFAULT_WRITER_QUEUE_ITEMS = 64;
115 
116   // Store/read block data
117   final IOEngine ioEngine;
118 
119   // Store the block in this map before writing it to cache
120   @VisibleForTesting
121   final ConcurrentMap<BlockCacheKey, RAMQueueEntry> ramCache;
122   // In this map, store the block's meta data like offset, length
123   @VisibleForTesting
124   ConcurrentMap<BlockCacheKey, BucketEntry> backingMap;
125 
126   /**
127    * Flag if the cache is enabled or not... We shut it off if there are IO
128    * errors for some time, so that Bucket IO exceptions/errors don't bring down
129    * the HBase server.
130    */
131   private volatile boolean cacheEnabled;
132 
133   /**
134    * A list of writer queues.  We have a queue per {@link WriterThread} we have running.
135    * In other words, the work adding blocks to the BucketCache is divided up amongst the
136    * running WriterThreads.  Its done by taking hash of the cache key modulo queue count.
137    * WriterThread when it runs takes whatever has been recently added and 'drains' the entries
138    * to the BucketCache.  It then updates the ramCache and backingMap accordingly.
139    */
140   @VisibleForTesting
141   final ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues =
142       new ArrayList<BlockingQueue<RAMQueueEntry>>();
143   @VisibleForTesting
144   final WriterThread[] writerThreads;
145 
146   /** Volatile boolean to track if free space is in process or not */
147   private volatile boolean freeInProgress = false;
148   private final Lock freeSpaceLock = new ReentrantLock();
149 
150   private UniqueIndexMap<Integer> deserialiserMap = new UniqueIndexMap<Integer>();
151 
152   private final AtomicLong realCacheSize = new AtomicLong(0);
153   private final AtomicLong heapSize = new AtomicLong(0);
154   /** Current number of cached elements */
155   private final AtomicLong blockNumber = new AtomicLong(0);
156   private final AtomicLong failedBlockAdditions = new AtomicLong(0);
157 
158   /** Cache access count (sequential ID) */
159   private final AtomicLong accessCount = new AtomicLong(0);
160 
161   private static final int DEFAULT_CACHE_WAIT_TIME = 50;
162   // Used in test now. If the flag is false and the cache speed is very fast,
163   // bucket cache will skip some blocks when caching. If the flag is true, we
164   // will wait blocks flushed to IOEngine for some time when caching
165   boolean wait_when_cache = false;
166 
167   private final BucketCacheStats cacheStats = new BucketCacheStats();
168 
169   private final String persistencePath;
170   private final long cacheCapacity;
171   /** Approximate block size */
172   private final long blockSize;
173 
174   /** Duration of IO errors tolerated before we disable cache, 1 min as default */
175   private final int ioErrorsTolerationDuration;
176   // 1 min
177   public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000;
178 
179   // Start time of first IO error when reading or writing IO Engine, it will be
180   // reset after a successful read/write.
181   private volatile long ioErrorStartTime = -1;
182 
183   /**
184    * A "sparse lock" implementation allowing to lock on a particular block
185    * identified by offset. The purpose of this is to avoid freeing the block
186    * which is being read.
187    *
188    * TODO:We could extend the IdLock to IdReadWriteLock for better.
189    */
190   @VisibleForTesting
191   final IdLock offsetLock = new IdLock();
192 
193   private final ConcurrentIndex<String, BlockCacheKey> blocksByHFile =
194       new ConcurrentIndex<String, BlockCacheKey>(new Comparator<BlockCacheKey>() {
195         @Override
196         public int compare(BlockCacheKey a, BlockCacheKey b) {
197           if (a.getOffset() == b.getOffset()) {
198             return 0;
199           } else if (a.getOffset() < b.getOffset()) {
200             return -1;
201           }
202           return 1;
203         }
204       });
205 
206   /** Statistics thread schedule pool (for heavy debugging, could remove) */
207   private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1,
208     new ThreadFactoryBuilder().setNameFormat("BucketCacheStatsExecutor").setDaemon(true).build());
209 
210   // Allocate or free space for the block
211   private BucketAllocator bucketAllocator;
212 
213   public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
214       int writerThreadNum, int writerQLen, String persistencePath) throws FileNotFoundException,
215       IOException {
216     this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
217       persistencePath, DEFAULT_ERROR_TOLERATION_DURATION);
218   }
219 
220   public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
221       int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration)
222       throws FileNotFoundException, IOException {
223     this.ioEngine = getIOEngineFromName(ioEngineName, capacity);
224     this.writerThreads = new WriterThread[writerThreadNum];
225     long blockNumCapacity = capacity / blockSize;
226     if (blockNumCapacity >= Integer.MAX_VALUE) {
227       // Enough for about 32TB of cache!
228       throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now");
229     }
230 
231     this.cacheCapacity = capacity;
232     this.persistencePath = persistencePath;
233     this.blockSize = blockSize;
234     this.ioErrorsTolerationDuration = ioErrorsTolerationDuration;
235 
236     bucketAllocator = new BucketAllocator(capacity, bucketSizes);
237     for (int i = 0; i < writerThreads.length; ++i) {
238       writerQueues.add(new ArrayBlockingQueue<RAMQueueEntry>(writerQLen));
239     }
240 
241     assert writerQueues.size() == writerThreads.length;
242     this.ramCache = new ConcurrentHashMap<BlockCacheKey, RAMQueueEntry>();
243 
244     this.backingMap = new ConcurrentHashMap<BlockCacheKey, BucketEntry>((int) blockNumCapacity);
245 
246     if (ioEngine.isPersistent() && persistencePath != null) {
247       try {
248         retrieveFromFile(bucketSizes);
249       } catch (IOException ioex) {
250         LOG.error("Can't restore from file because of", ioex);
251       } catch (ClassNotFoundException cnfe) {
252         LOG.error("Can't restore from file in rebuild because can't deserialise",cnfe);
253         throw new RuntimeException(cnfe);
254       }
255     }
256     final String threadName = Thread.currentThread().getName();
257     this.cacheEnabled = true;
258     for (int i = 0; i < writerThreads.length; ++i) {
259       writerThreads[i] = new WriterThread(writerQueues.get(i));
260       writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i);
261       writerThreads[i].setDaemon(true);
262     }
263     startWriterThreads();
264 
265     // Run the statistics thread periodically to print the cache statistics log
266     // TODO: Add means of turning this off.  Bit obnoxious running thread just to make a log
267     // every five minutes.
268     this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
269         statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
270     LOG.info("Started bucket cache; ioengine=" + ioEngineName +
271         ", capacity=" + StringUtils.byteDesc(capacity) +
272       ", blockSize=" + StringUtils.byteDesc(blockSize) + ", writerThreadNum=" +
273         writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath=" +
274       persistencePath + ", bucketAllocator=" + this.bucketAllocator.getClass().getName());
275   }
276 
277   /**
278    * Called by the constructor to start the writer threads. Used by tests that need to override
279    * starting the threads.
280    */
281   @VisibleForTesting
282   protected void startWriterThreads() {
283     for (WriterThread thread : writerThreads) {
284       thread.start();
285     }
286   }
287 
288   @VisibleForTesting
289   boolean isCacheEnabled() {
290     return this.cacheEnabled;
291   }
292 
293   public long getMaxSize() {
294     return this.cacheCapacity;
295   }
296 
297   public String getIoEngine() {
298     return ioEngine.toString();
299   }
300 
301   /**
302    * Get the IOEngine from the IO engine name
303    * @param ioEngineName
304    * @param capacity
305    * @return the IOEngine
306    * @throws IOException
307    */
308   private IOEngine getIOEngineFromName(String ioEngineName, long capacity)
309       throws IOException {
310     if (ioEngineName.startsWith("file:"))
311       return new FileIOEngine(ioEngineName.substring(5), capacity);
312     else if (ioEngineName.startsWith("offheap"))
313       return new ByteBufferIOEngine(capacity, true);
314     else if (ioEngineName.startsWith("heap"))
315       return new ByteBufferIOEngine(capacity, false);
316     else
317       throw new IllegalArgumentException(
318           "Don't understand io engine name for cache - prefix with file:, heap or offheap");
319   }
320 
321   /**
322    * Cache the block with the specified name and buffer.
323    * @param cacheKey block's cache key
324    * @param buf block buffer
325    */
326   @Override
327   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
328     cacheBlock(cacheKey, buf, false, false);
329   }
330 
331   /**
332    * Cache the block with the specified name and buffer.
333    * @param cacheKey block's cache key
334    * @param cachedItem block buffer
335    * @param inMemory if block is in-memory
336    * @param cacheDataInL1
337    */
338   @Override
339   public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
340       final boolean cacheDataInL1) {
341     cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache);
342   }
343 
344   /**
345    * Cache the block to ramCache
346    * @param cacheKey block's cache key
347    * @param cachedItem block buffer
348    * @param inMemory if block is in-memory
349    * @param wait if true, blocking wait when queue is full
350    */
351   public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
352       boolean wait) {
353     if (!cacheEnabled) {
354       return;
355     }
356 
357     if (backingMap.containsKey(cacheKey)) {
358       return;
359     }
360 
361     /*
362      * Stuff the entry into the RAM cache so it can get drained to the persistent store
363      */
364     RAMQueueEntry re =
365         new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory);
366     if (ramCache.putIfAbsent(cacheKey, re) != null) {
367       return;
368     }
369     int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size();
370     BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum);
371     boolean successfulAddition = false;
372     if (wait) {
373       try {
374         successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS);
375       } catch (InterruptedException e) {
376         Thread.currentThread().interrupt();
377       }
378     } else {
379       successfulAddition = bq.offer(re);
380     }
381     if (!successfulAddition) {
382       ramCache.remove(cacheKey);
383       failedBlockAdditions.incrementAndGet();
384     } else {
385       this.blockNumber.incrementAndGet();
386       this.heapSize.addAndGet(cachedItem.heapSize());
387       blocksByHFile.put(cacheKey.getHfileName(), cacheKey);
388     }
389   }
390 
391   /**
392    * Get the buffer of the block with the specified key.
393    * @param key block's cache key
394    * @param caching true if the caller caches blocks on cache misses
395    * @param repeat Whether this is a repeat lookup for the same block
396    * @param updateCacheMetrics Whether we should update cache metrics or not
397    * @return buffer of specified cache key, or null if not in cache
398    */
399   @Override
400   public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
401       boolean updateCacheMetrics) {
402     if (!cacheEnabled) {
403       return null;
404     }
405     RAMQueueEntry re = ramCache.get(key);
406     if (re != null) {
407       if (updateCacheMetrics) {
408         cacheStats.hit(caching);
409       }
410       re.access(accessCount.incrementAndGet());
411       return re.getData();
412     }
413     BucketEntry bucketEntry = backingMap.get(key);
414     if (bucketEntry != null) {
415       long start = System.nanoTime();
416       IdLock.Entry lockEntry = null;
417       try {
418         lockEntry = offsetLock.getLockEntry(bucketEntry.offset());
419         // We can not read here even if backingMap does contain the given key because its offset
420         // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check
421         // existence here.
422         if (bucketEntry.equals(backingMap.get(key))) {
423           // TODO : change this area - should be removed after server cells and
424           // 12295 are available
425           int len = bucketEntry.getLength();
426           Pair<ByteBuff, MemoryType> pair = ioEngine.read(bucketEntry.offset(), len);
427           ByteBuff bb = pair.getFirst();
428           CacheableDeserializer<Cacheable> deserializer =
429             bucketEntry.deserializerReference(this.deserialiserMap);
430           Cacheable cachedBlock = deserializer.deserialize(bb, true, pair.getSecond());
431           long timeTaken = System.nanoTime() - start;
432           if (updateCacheMetrics) {
433             cacheStats.hit(caching);
434             cacheStats.ioHit(timeTaken);
435           }
436           if (pair.getSecond() == MemoryType.SHARED) {
437             bucketEntry.refCount.incrementAndGet();
438           }
439           bucketEntry.access(accessCount.incrementAndGet());
440           if (this.ioErrorStartTime > 0) {
441             ioErrorStartTime = -1;
442           }
443           return cachedBlock;
444         }
445       } catch (IOException ioex) {
446         LOG.error("Failed reading block " + key + " from bucket cache", ioex);
447         checkIOErrorIsTolerated();
448       } finally {
449         if (lockEntry != null) {
450           offsetLock.releaseLockEntry(lockEntry);
451         }
452       }
453     }
454     if (!repeat && updateCacheMetrics) {
455       cacheStats.miss(caching);
456     }
457     return null;
458   }
459 
460   @VisibleForTesting
461   void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber) {
462     bucketAllocator.freeBlock(bucketEntry.offset());
463     realCacheSize.addAndGet(-1 * bucketEntry.getLength());
464     blocksByHFile.remove(cacheKey.getHfileName(), cacheKey);
465     if (decrementBlockNumber) {
466       this.blockNumber.decrementAndGet();
467     }
468   }
469 
470   @Override
471   public boolean evictBlock(BlockCacheKey cacheKey) {
472     return evictBlock(cacheKey, true);
473   }
474 
475   // does not check for the ref count. Just tries to evict it if found in the
476   // bucket map
477   private boolean forceEvict(BlockCacheKey cacheKey) {
478     if (!cacheEnabled) {
479       return false;
480     }
481     RAMQueueEntry removedBlock = checkRamCache(cacheKey);
482     BucketEntry bucketEntry = backingMap.get(cacheKey);
483     if (bucketEntry == null) {
484       if (removedBlock != null) {
485         cacheStats.evicted(0);
486         return true;
487       } else {
488         return false;
489       }
490     }
491     IdLock.Entry lockEntry = null;
492     try {
493       lockEntry = offsetLock.getLockEntry(bucketEntry.offset());
494       if (backingMap.remove(cacheKey, bucketEntry)) {
495         blockEvicted(cacheKey, bucketEntry, removedBlock == null);
496       } else {
497         return false;
498       }
499     } catch (IOException ie) {
500       LOG.warn("Failed evicting block " + cacheKey);
501       return false;
502     } finally {
503       if (lockEntry != null) {
504         offsetLock.releaseLockEntry(lockEntry);
505       }
506     }
507     cacheStats.evicted(bucketEntry.getCachedTime());
508     return true;
509   }
510 
511   private RAMQueueEntry checkRamCache(BlockCacheKey cacheKey) {
512     RAMQueueEntry removedBlock = ramCache.remove(cacheKey);
513     if (removedBlock != null) {
514       this.blockNumber.decrementAndGet();
515       this.heapSize.addAndGet(-1 * removedBlock.getData().heapSize());
516     }
517     return removedBlock;
518   }
519 
520   public boolean evictBlock(BlockCacheKey cacheKey, boolean deletedBlock) {
521     if (!cacheEnabled) {
522       return false;
523     }
524     RAMQueueEntry removedBlock = checkRamCache(cacheKey);
525     BucketEntry bucketEntry = backingMap.get(cacheKey);
526     if (bucketEntry == null) {
527       if (removedBlock != null) {
528         cacheStats.evicted(0);
529         return true;
530       } else {
531         return false;
532       }
533     }
534     IdLock.Entry lockEntry = null;
535     try {
536       lockEntry = offsetLock.getLockEntry(bucketEntry.offset());
537       int refCount = bucketEntry.refCount.get();
538       if(refCount == 0) {
539         if (backingMap.remove(cacheKey, bucketEntry)) {
540           blockEvicted(cacheKey, bucketEntry, removedBlock == null);
541         } else {
542           return false;
543         }
544       } else {
545         if(!deletedBlock) {
546           if (LOG.isDebugEnabled()) {
547             LOG.debug("This block " + cacheKey + " is still referred by " + refCount
548                 + " readers. Can not be freed now");
549           }
550           return false;
551         } else {
552           if (LOG.isDebugEnabled()) {
553             LOG.debug("This block " + cacheKey + " is still referred by " + refCount
554                 + " readers. Can not be freed now. Hence will mark this"
555                 + " for evicting at a later point");
556           }
557           bucketEntry.markedForEvict = true;
558         }
559       }
560     } catch (IOException ie) {
561       LOG.warn("Failed evicting block " + cacheKey);
562       return false;
563     } finally {
564       if (lockEntry != null) {
565         offsetLock.releaseLockEntry(lockEntry);
566       }
567     }
568     cacheStats.evicted(bucketEntry.getCachedTime());
569     return true;
570   }
571 
572   /*
573    * Statistics thread.  Periodically output cache statistics to the log.
574    */
575   private static class StatisticsThread extends Thread {
576     private final BucketCache bucketCache;
577 
578     public StatisticsThread(BucketCache bucketCache) {
579       super("BucketCacheStatsThread");
580       setDaemon(true);
581       this.bucketCache = bucketCache;
582     }
583 
584     @Override
585     public void run() {
586       bucketCache.logStats();
587     }
588   }
589 
590   public void logStats() {
591     long totalSize = bucketAllocator.getTotalSize();
592     long usedSize = bucketAllocator.getUsedSize();
593     long freeSize = totalSize - usedSize;
594     long cacheSize = getRealCacheSize();
595     LOG.info("failedBlockAdditions=" + getFailedBlockAdditions() + ", " +
596         "totalSize=" + StringUtils.byteDesc(totalSize) + ", " +
597         "freeSize=" + StringUtils.byteDesc(freeSize) + ", " +
598         "usedSize=" + StringUtils.byteDesc(usedSize) +", " +
599         "cacheSize=" + StringUtils.byteDesc(cacheSize) +", " +
600         "accesses=" + cacheStats.getRequestCount() + ", " +
601         "hits=" + cacheStats.getHitCount() + ", " +
602         "IOhitsPerSecond=" + cacheStats.getIOHitsPerSecond() + ", " +
603         "IOTimePerHit=" + String.format("%.2f", cacheStats.getIOTimePerHit())+ ", " +
604         "hitRatio=" + (cacheStats.getHitCount() == 0 ? "0," :
605           (StringUtils.formatPercent(cacheStats.getHitRatio(), 2)+ ", ")) +
606         "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " +
607         "cachingHits=" + cacheStats.getHitCachingCount() + ", " +
608         "cachingHitsRatio=" +(cacheStats.getHitCachingCount() == 0 ? "0," :
609           (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2)+ ", ")) +
610         "evictions=" + cacheStats.getEvictionCount() + ", " +
611         "evicted=" + cacheStats.getEvictedCount() + ", " +
612         "evictedPerRun=" + cacheStats.evictedPerEviction());
613     cacheStats.reset();
614   }
615 
616   public long getFailedBlockAdditions() {
617     return this.failedBlockAdditions.get();
618   }
619 
620   public long getRealCacheSize() {
621     return this.realCacheSize.get();
622   }
623 
624   private long acceptableSize() {
625     return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_ACCEPT_FACTOR);
626   }
627 
628   private long singleSize() {
629     return (long) Math.floor(bucketAllocator.getTotalSize()
630         * DEFAULT_SINGLE_FACTOR * DEFAULT_MIN_FACTOR);
631   }
632 
633   private long multiSize() {
634     return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_MULTI_FACTOR
635         * DEFAULT_MIN_FACTOR);
636   }
637 
638   private long memorySize() {
639     return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_MEMORY_FACTOR
640         * DEFAULT_MIN_FACTOR);
641   }
642 
643   /**
644    * Free the space if the used size reaches acceptableSize() or one size block
645    * couldn't be allocated. When freeing the space, we use the LRU algorithm and
646    * ensure there must be some blocks evicted
647    * @param why Why we are being called
648    */
649   private void freeSpace(final String why) {
650     // Ensure only one freeSpace progress at a time
651     if (!freeSpaceLock.tryLock()) return;
652     try {
653       freeInProgress = true;
654       long bytesToFreeWithoutExtra = 0;
655       // Calculate free byte for each bucketSizeinfo
656       StringBuffer msgBuffer = LOG.isDebugEnabled()? new StringBuffer(): null;
657       BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics();
658       long[] bytesToFreeForBucket = new long[stats.length];
659       for (int i = 0; i < stats.length; i++) {
660         bytesToFreeForBucket[i] = 0;
661         long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - DEFAULT_MIN_FACTOR));
662         freeGoal = Math.max(freeGoal, 1);
663         if (stats[i].freeCount() < freeGoal) {
664           bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount());
665           bytesToFreeWithoutExtra += bytesToFreeForBucket[i];
666           if (msgBuffer != null) {
667             msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")="
668               + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", ");
669           }
670         }
671       }
672       if (msgBuffer != null) {
673         msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", ");
674       }
675 
676       if (bytesToFreeWithoutExtra <= 0) {
677         return;
678       }
679       long currentSize = bucketAllocator.getUsedSize();
680       long totalSize=bucketAllocator.getTotalSize();
681       if (LOG.isDebugEnabled() && msgBuffer != null) {
682         LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString() +
683           " of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" +
684           StringUtils.byteDesc(realCacheSize.get()) + ", total=" + StringUtils.byteDesc(totalSize));
685       }
686 
687       long bytesToFreeWithExtra = (long) Math.floor(bytesToFreeWithoutExtra
688           * (1 + DEFAULT_EXTRA_FREE_FACTOR));
689 
690       // Instantiate priority buckets
691       BucketEntryGroup bucketSingle = new BucketEntryGroup(bytesToFreeWithExtra,
692           blockSize, singleSize());
693       BucketEntryGroup bucketMulti = new BucketEntryGroup(bytesToFreeWithExtra,
694           blockSize, multiSize());
695       BucketEntryGroup bucketMemory = new BucketEntryGroup(bytesToFreeWithExtra,
696           blockSize, memorySize());
697 
698       // Scan entire map putting bucket entry into appropriate bucket entry
699       // group
700       for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) {
701         switch (bucketEntryWithKey.getValue().getPriority()) {
702           case SINGLE: {
703             bucketSingle.add(bucketEntryWithKey);
704             break;
705           }
706           case MULTI: {
707             bucketMulti.add(bucketEntryWithKey);
708             break;
709           }
710           case MEMORY: {
711             bucketMemory.add(bucketEntryWithKey);
712             break;
713           }
714         }
715       }
716 
717       PriorityQueue<BucketEntryGroup> bucketQueue = new PriorityQueue<BucketEntryGroup>(3);
718 
719       bucketQueue.add(bucketSingle);
720       bucketQueue.add(bucketMulti);
721       bucketQueue.add(bucketMemory);
722 
723       int remainingBuckets = 3;
724       long bytesFreed = 0;
725 
726       BucketEntryGroup bucketGroup;
727       while ((bucketGroup = bucketQueue.poll()) != null) {
728         long overflow = bucketGroup.overflow();
729         if (overflow > 0) {
730           long bucketBytesToFree = Math.min(overflow,
731               (bytesToFreeWithoutExtra - bytesFreed) / remainingBuckets);
732           bytesFreed += bucketGroup.free(bucketBytesToFree);
733         }
734         remainingBuckets--;
735       }
736 
737       /**
738        * Check whether need extra free because some bucketSizeinfo still needs
739        * free space
740        */
741       stats = bucketAllocator.getIndexStatistics();
742       boolean needFreeForExtra = false;
743       for (int i = 0; i < stats.length; i++) {
744         long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - DEFAULT_MIN_FACTOR));
745         freeGoal = Math.max(freeGoal, 1);
746         if (stats[i].freeCount() < freeGoal) {
747           needFreeForExtra = true;
748           break;
749         }
750       }
751 
752       if (needFreeForExtra) {
753         bucketQueue.clear();
754         remainingBuckets = 2;
755 
756         bucketQueue.add(bucketSingle);
757         bucketQueue.add(bucketMulti);
758 
759         while ((bucketGroup = bucketQueue.poll()) != null) {
760           long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets;
761           bytesFreed += bucketGroup.free(bucketBytesToFree);
762           remainingBuckets--;
763         }
764       }
765 
766       if (LOG.isDebugEnabled()) {
767         long single = bucketSingle.totalSize();
768         long multi = bucketMulti.totalSize();
769         long memory = bucketMemory.totalSize();
770         if (LOG.isDebugEnabled()) {
771           LOG.debug("Bucket cache free space completed; " + "freed="
772             + StringUtils.byteDesc(bytesFreed) + ", " + "total="
773             + StringUtils.byteDesc(totalSize) + ", " + "single="
774             + StringUtils.byteDesc(single) + ", " + "multi="
775             + StringUtils.byteDesc(multi) + ", " + "memory="
776             + StringUtils.byteDesc(memory));
777         }
778       }
779 
780     } catch (Throwable t) {
781       LOG.warn("Failed freeing space", t);
782     } finally {
783       cacheStats.evict();
784       freeInProgress = false;
785       freeSpaceLock.unlock();
786     }
787   }
788 
789   // This handles flushing the RAM cache to IOEngine.
790   @VisibleForTesting
791   class WriterThread extends HasThread {
792     private final BlockingQueue<RAMQueueEntry> inputQueue;
793     private volatile boolean writerEnabled = true;
794 
795     WriterThread(BlockingQueue<RAMQueueEntry> queue) {
796       this.inputQueue = queue;
797     }
798 
799     // Used for test
800     @VisibleForTesting
801     void disableWriter() {
802       this.writerEnabled = false;
803     }
804 
805     public void run() {
806       List<RAMQueueEntry> entries = new ArrayList<RAMQueueEntry>();
807       try {
808         while (cacheEnabled && writerEnabled) {
809           try {
810             try {
811               // Blocks
812               entries = getRAMQueueEntries(inputQueue, entries);
813             } catch (InterruptedException ie) {
814               if (!cacheEnabled) break;
815             }
816             doDrain(entries);
817           } catch (Exception ioe) {
818             LOG.error("WriterThread encountered error", ioe);
819           }
820         }
821       } catch (Throwable t) {
822         LOG.warn("Failed doing drain", t);
823       }
824       LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled);
825     }
826 
827     /**
828      * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap.
829      * Process all that are passed in even if failure being sure to remove from ramCache else we'll
830      * never undo the references and we'll OOME.
831      * @param entries Presumes list passed in here will be processed by this invocation only. No
832      * interference expected.
833      * @throws InterruptedException
834      */
835     @VisibleForTesting
836     void doDrain(final List<RAMQueueEntry> entries) throws InterruptedException {
837       if (entries.isEmpty()) {
838         return;
839       }
840       // This method is a little hard to follow. We run through the passed in entries and for each
841       // successful add, we add a non-null BucketEntry to the below bucketEntries.  Later we must
842       // do cleanup making sure we've cleared ramCache of all entries regardless of whether we
843       // successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by
844       // filling ramCache.  We do the clean up by again running through the passed in entries
845       // doing extra work when we find a non-null bucketEntries corresponding entry.
846       final int size = entries.size();
847       BucketEntry[] bucketEntries = new BucketEntry[size];
848       // Index updated inside loop if success or if we can't succeed. We retry if cache is full
849       // when we go to add an entry by going around the loop again without upping the index.
850       int index = 0;
851       while (cacheEnabled && index < size) {
852         RAMQueueEntry re = null;
853         try {
854           re = entries.get(index);
855           if (re == null) {
856             LOG.warn("Couldn't get entry or changed on us; who else is messing with it?");
857             index++;
858             continue;
859           }
860           BucketEntry bucketEntry =
861             re.writeToCache(ioEngine, bucketAllocator, deserialiserMap, realCacheSize);
862           // Successfully added.  Up index and add bucketEntry. Clear io exceptions.
863           bucketEntries[index] = bucketEntry;
864           if (ioErrorStartTime > 0) {
865             ioErrorStartTime = -1;
866           }
867           index++;
868         } catch (BucketAllocatorException fle) {
869           LOG.warn("Failed allocation for " + (re == null ? "" : re.getKey()) + "; " + fle);
870           // Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below.
871           bucketEntries[index] = null;
872           index++;
873         } catch (CacheFullException cfe) {
874           // Cache full when we tried to add. Try freeing space and then retrying (don't up index)
875           if (!freeInProgress) {
876             freeSpace("Full!");
877           } else {
878             Thread.sleep(50);
879           }
880         } catch (IOException ioex) {
881           // Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem.
882           LOG.error("Failed writing to bucket cache", ioex);
883           checkIOErrorIsTolerated();
884         }
885       }
886 
887       // Make sure data pages are written are on media before we update maps.
888       try {
889         ioEngine.sync();
890       } catch (IOException ioex) {
891         LOG.error("Failed syncing IO engine", ioex);
892         checkIOErrorIsTolerated();
893         // Since we failed sync, free the blocks in bucket allocator
894         for (int i = 0; i < entries.size(); ++i) {
895           if (bucketEntries[i] != null) {
896             bucketAllocator.freeBlock(bucketEntries[i].offset());
897             bucketEntries[i] = null;
898           }
899         }
900       }
901 
902       // Now add to backingMap if successfully added to bucket cache.  Remove from ramCache if
903       // success or error.
904       for (int i = 0; i < size; ++i) {
905         BlockCacheKey key = entries.get(i).getKey();
906         // Only add if non-null entry.
907         if (bucketEntries[i] != null) {
908           backingMap.put(key, bucketEntries[i]);
909         }
910         // Always remove from ramCache even if we failed adding it to the block cache above.
911         RAMQueueEntry ramCacheEntry = ramCache.remove(key);
912         if (ramCacheEntry != null) {
913           heapSize.addAndGet(-1 * entries.get(i).getData().heapSize());
914         } else if (bucketEntries[i] != null){
915           // Block should have already been evicted. Remove it and free space.
916           IdLock.Entry lockEntry = null;
917           try {
918             lockEntry = offsetLock.getLockEntry(bucketEntries[i].offset());
919             if (backingMap.remove(key, bucketEntries[i])) {
920               blockEvicted(key, bucketEntries[i], false);
921             }
922           } catch (IOException e) {
923             LOG.warn("failed to free space for " + key, e);
924           } finally {
925             if (lockEntry != null) {
926               offsetLock.releaseLockEntry(lockEntry);
927             }
928           }
929         }
930       }
931 
932       long used = bucketAllocator.getUsedSize();
933       if (used > acceptableSize()) {
934         freeSpace("Used=" + used + " > acceptable=" + acceptableSize());
935       }
936       return;
937     }
938   }
939 
940   /**
941    * Blocks until elements available in <code>q</code> then tries to grab as many as possible
942    * before returning.
943    * @param recepticle Where to stash the elements taken from queue. We clear before we use it
944    * just in case.
945    * @param q The queue to take from.
946    * @return <code>receptical laden with elements taken from the queue or empty if none found.
947    */
948   @VisibleForTesting
949   static List<RAMQueueEntry> getRAMQueueEntries(final BlockingQueue<RAMQueueEntry> q,
950       final List<RAMQueueEntry> receptical)
951   throws InterruptedException {
952     // Clear sets all entries to null and sets size to 0. We retain allocations. Presume it
953     // ok even if list grew to accommodate thousands.
954     receptical.clear();
955     receptical.add(q.take());
956     q.drainTo(receptical);
957     return receptical;
958   }
959 
960   private void persistToFile() throws IOException {
961     assert !cacheEnabled;
962     FileOutputStream fos = null;
963     ObjectOutputStream oos = null;
964     try {
965       if (!ioEngine.isPersistent())
966         throw new IOException(
967             "Attempt to persist non-persistent cache mappings!");
968       fos = new FileOutputStream(persistencePath, false);
969       oos = new ObjectOutputStream(fos);
970       oos.writeLong(cacheCapacity);
971       oos.writeUTF(ioEngine.getClass().getName());
972       oos.writeUTF(backingMap.getClass().getName());
973       oos.writeObject(deserialiserMap);
974       oos.writeObject(backingMap);
975     } finally {
976       if (oos != null) oos.close();
977       if (fos != null) fos.close();
978     }
979   }
980 
981   @SuppressWarnings("unchecked")
982   private void retrieveFromFile(int[] bucketSizes) throws IOException, BucketAllocatorException,
983       ClassNotFoundException {
984     File persistenceFile = new File(persistencePath);
985     if (!persistenceFile.exists()) {
986       return;
987     }
988     assert !cacheEnabled;
989     FileInputStream fis = null;
990     ObjectInputStream ois = null;
991     try {
992       if (!ioEngine.isPersistent())
993         throw new IOException(
994             "Attempt to restore non-persistent cache mappings!");
995       fis = new FileInputStream(persistencePath);
996       ois = new ObjectInputStream(fis);
997       long capacitySize = ois.readLong();
998       if (capacitySize != cacheCapacity)
999         throw new IOException("Mismatched cache capacity:"
1000             + StringUtils.byteDesc(capacitySize) + ", expected: "
1001             + StringUtils.byteDesc(cacheCapacity));
1002       String ioclass = ois.readUTF();
1003       String mapclass = ois.readUTF();
1004       if (!ioEngine.getClass().getName().equals(ioclass))
1005         throw new IOException("Class name for IO engine mismatch: " + ioclass
1006             + ", expected:" + ioEngine.getClass().getName());
1007       if (!backingMap.getClass().getName().equals(mapclass))
1008         throw new IOException("Class name for cache map mismatch: " + mapclass
1009             + ", expected:" + backingMap.getClass().getName());
1010       UniqueIndexMap<Integer> deserMap = (UniqueIndexMap<Integer>) ois
1011           .readObject();
1012       BucketAllocator allocator = new BucketAllocator(cacheCapacity, bucketSizes,
1013           backingMap, realCacheSize);
1014       backingMap = (ConcurrentHashMap<BlockCacheKey, BucketEntry>) ois
1015           .readObject();
1016       bucketAllocator = allocator;
1017       deserialiserMap = deserMap;
1018     } finally {
1019       if (ois != null) ois.close();
1020       if (fis != null) fis.close();
1021       if (!persistenceFile.delete()) {
1022         throw new IOException("Failed deleting persistence file "
1023             + persistenceFile.getAbsolutePath());
1024       }
1025     }
1026   }
1027 
1028   /**
1029    * Check whether we tolerate IO error this time. If the duration of IOEngine
1030    * throwing errors exceeds ioErrorsDurationTimeTolerated, we will disable the
1031    * cache
1032    */
1033   private void checkIOErrorIsTolerated() {
1034     long now = EnvironmentEdgeManager.currentTime();
1035     if (this.ioErrorStartTime > 0) {
1036       if (cacheEnabled && (now - ioErrorStartTime) > this.ioErrorsTolerationDuration) {
1037         LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration +
1038           "ms, disabing cache, please check your IOEngine");
1039         disableCache();
1040       }
1041     } else {
1042       this.ioErrorStartTime = now;
1043     }
1044   }
1045 
1046   /**
1047    * Used to shut down the cache -or- turn it off in the case of something
1048    * broken.
1049    */
1050   private void disableCache() {
1051     if (!cacheEnabled)
1052       return;
1053     cacheEnabled = false;
1054     ioEngine.shutdown();
1055     this.scheduleThreadPool.shutdown();
1056     for (int i = 0; i < writerThreads.length; ++i)
1057       writerThreads[i].interrupt();
1058     this.ramCache.clear();
1059     if (!ioEngine.isPersistent() || persistencePath == null) {
1060       this.backingMap.clear();
1061     }
1062   }
1063 
1064   private void join() throws InterruptedException {
1065     for (int i = 0; i < writerThreads.length; ++i)
1066       writerThreads[i].join();
1067   }
1068 
1069   @Override
1070   public void shutdown() {
1071     disableCache();
1072     LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent()
1073         + "; path to write=" + persistencePath);
1074     if (ioEngine.isPersistent() && persistencePath != null) {
1075       try {
1076         join();
1077         persistToFile();
1078       } catch (IOException ex) {
1079         LOG.error("Unable to persist data on exit: " + ex.toString(), ex);
1080       } catch (InterruptedException e) {
1081         LOG.warn("Failed to persist data on exit", e);
1082       }
1083     }
1084   }
1085 
1086   @Override
1087   public CacheStats getStats() {
1088     return cacheStats;
1089   }
1090 
1091   public BucketAllocator getAllocator() {
1092     return this.bucketAllocator;
1093   }
1094 
1095   @Override
1096   public long heapSize() {
1097     return this.heapSize.get();
1098   }
1099 
1100   @Override
1101   public long size() {
1102     return this.realCacheSize.get();
1103   }
1104 
1105   @Override
1106   public long getFreeSize() {
1107     return this.bucketAllocator.getFreeSize();
1108   }
1109 
1110   @Override
1111   public long getBlockCount() {
1112     return this.blockNumber.get();
1113   }
1114 
1115   @Override
1116   public long getCurrentSize() {
1117     return this.bucketAllocator.getUsedSize();
1118   }
1119 
1120   /**
1121    * Evicts all blocks for a specific HFile.
1122    * <p>
1123    * This is used for evict-on-close to remove all blocks of a specific HFile.
1124    *
1125    * @return the number of blocks evicted
1126    */
1127   @Override
1128   public int evictBlocksByHfileName(String hfileName) {
1129     // Copy the list to avoid ConcurrentModificationException
1130     // as evictBlockKey removes the key from the index
1131     Set<BlockCacheKey> keySet = blocksByHFile.values(hfileName);
1132     if (keySet == null) {
1133       return 0;
1134     }
1135     int numEvicted = 0;
1136     List<BlockCacheKey> keysForHFile = ImmutableList.copyOf(keySet);
1137     for (BlockCacheKey key : keysForHFile) {
1138       if (evictBlock(key)) {
1139           ++numEvicted;
1140       }
1141     }
1142 
1143     return numEvicted;
1144   }
1145 
1146   /**
1147    * Item in cache. We expect this to be where most memory goes. Java uses 8
1148    * bytes just for object headers; after this, we want to use as little as
1149    * possible - so we only use 8 bytes, but in order to do so we end up messing
1150    * around with all this Java casting stuff. Offset stored as 5 bytes that make
1151    * up the long. Doubt we'll see devices this big for ages. Offsets are divided
1152    * by 256. So 5 bytes gives us 256TB or so.
1153    */
1154   static class BucketEntry implements Serializable {
1155     private static final long serialVersionUID = -6741504807982257534L;
1156 
1157     // access counter comparator, descending order
1158     static final Comparator<BucketEntry> COMPARATOR = new Comparator<BucketCache.BucketEntry>() {
1159 
1160       @Override
1161       public int compare(BucketEntry o1, BucketEntry o2) {
1162         long accessCounter1 = o1.accessCounter;
1163         long accessCounter2 = o2.accessCounter;
1164         return accessCounter1 < accessCounter2 ? 1 : accessCounter1 == accessCounter2 ? 0 : -1;
1165       }
1166     };
1167 
1168     private int offsetBase;
1169     private int length;
1170     private byte offset1;
1171     byte deserialiserIndex;
1172     private volatile long accessCounter;
1173     private BlockPriority priority;
1174     // Set this when we were not able to forcefully evict the block
1175     private volatile boolean markedForEvict;
1176     private AtomicInteger refCount = new AtomicInteger(0);
1177 
1178     /**
1179      * Time this block was cached.  Presumes we are created just before we are added to the cache.
1180      */
1181     private final long cachedTime = System.nanoTime();
1182 
1183     BucketEntry(long offset, int length, long accessCounter, boolean inMemory) {
1184       setOffset(offset);
1185       this.length = length;
1186       this.accessCounter = accessCounter;
1187       if (inMemory) {
1188         this.priority = BlockPriority.MEMORY;
1189       } else {
1190         this.priority = BlockPriority.SINGLE;
1191       }
1192     }
1193 
1194     long offset() { // Java has no unsigned numbers
1195       long o = ((long) offsetBase) & 0xFFFFFFFF;
1196       o += (((long) (offset1)) & 0xFF) << 32;
1197       return o << 8;
1198     }
1199 
1200     private void setOffset(long value) {
1201       assert (value & 0xFF) == 0;
1202       value >>= 8;
1203       offsetBase = (int) value;
1204       offset1 = (byte) (value >> 32);
1205     }
1206 
1207     public int getLength() {
1208       return length;
1209     }
1210 
1211     protected CacheableDeserializer<Cacheable> deserializerReference(
1212         UniqueIndexMap<Integer> deserialiserMap) {
1213       return CacheableDeserializerIdManager.getDeserializer(deserialiserMap
1214           .unmap(deserialiserIndex));
1215     }
1216 
1217     protected void setDeserialiserReference(
1218         CacheableDeserializer<Cacheable> deserializer,
1219         UniqueIndexMap<Integer> deserialiserMap) {
1220       this.deserialiserIndex = ((byte) deserialiserMap.map(deserializer
1221           .getDeserialiserIdentifier()));
1222     }
1223 
1224     /**
1225      * Block has been accessed. Update its local access counter.
1226      */
1227     public void access(long accessCounter) {
1228       this.accessCounter = accessCounter;
1229       if (this.priority == BlockPriority.SINGLE) {
1230         this.priority = BlockPriority.MULTI;
1231       }
1232     }
1233 
1234     public BlockPriority getPriority() {
1235       return this.priority;
1236     }
1237 
1238     public long getCachedTime() {
1239       return cachedTime;
1240     }
1241   }
1242 
1243   /**
1244    * Used to group bucket entries into priority buckets. There will be a
1245    * BucketEntryGroup for each priority (single, multi, memory). Once bucketed,
1246    * the eviction algorithm takes the appropriate number of elements out of each
1247    * according to configuration parameters and their relative sizes.
1248    */
1249   private class BucketEntryGroup implements Comparable<BucketEntryGroup> {
1250 
1251     private CachedEntryQueue queue;
1252     private long totalSize = 0;
1253     private long bucketSize;
1254 
1255     public BucketEntryGroup(long bytesToFree, long blockSize, long bucketSize) {
1256       this.bucketSize = bucketSize;
1257       queue = new CachedEntryQueue(bytesToFree, blockSize);
1258       totalSize = 0;
1259     }
1260 
1261     public void add(Map.Entry<BlockCacheKey, BucketEntry> block) {
1262       totalSize += block.getValue().getLength();
1263       queue.add(block);
1264     }
1265 
1266     public long free(long toFree) {
1267       Map.Entry<BlockCacheKey, BucketEntry> entry;
1268       long freedBytes = 0;
1269       // TODO avoid a cycling siutation. We find no block which is not in use and so no way to free
1270       // What to do then? Caching attempt fail? Need some changes in cacheBlock API?
1271       while ((entry = queue.pollLast()) != null) {
1272         if (evictBlock(entry.getKey(), false)) {
1273           freedBytes += entry.getValue().getLength();
1274         }
1275         if (freedBytes >= toFree) {
1276           return freedBytes;
1277         }
1278       }
1279       return freedBytes;
1280     }
1281 
1282     public long overflow() {
1283       return totalSize - bucketSize;
1284     }
1285 
1286     public long totalSize() {
1287       return totalSize;
1288     }
1289 
1290     @Override
1291     public int compareTo(BucketEntryGroup that) {
1292       if (this.overflow() == that.overflow())
1293         return 0;
1294       return this.overflow() > that.overflow() ? 1 : -1;
1295     }
1296 
1297     @Override
1298     public boolean equals(Object that) {
1299       return this == that;
1300     }
1301 
1302   }
1303 
1304   /**
1305    * Block Entry stored in the memory with key,data and so on
1306    */
1307   @VisibleForTesting
1308   static class RAMQueueEntry {
1309     private BlockCacheKey key;
1310     private Cacheable data;
1311     private long accessCounter;
1312     private boolean inMemory;
1313 
1314     public RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter,
1315         boolean inMemory) {
1316       this.key = bck;
1317       this.data = data;
1318       this.accessCounter = accessCounter;
1319       this.inMemory = inMemory;
1320     }
1321 
1322     public Cacheable getData() {
1323       return data;
1324     }
1325 
1326     public BlockCacheKey getKey() {
1327       return key;
1328     }
1329 
1330     public void access(long accessCounter) {
1331       this.accessCounter = accessCounter;
1332     }
1333 
1334     public BucketEntry writeToCache(final IOEngine ioEngine,
1335         final BucketAllocator bucketAllocator,
1336         final UniqueIndexMap<Integer> deserialiserMap,
1337         final AtomicLong realCacheSize) throws CacheFullException, IOException,
1338         BucketAllocatorException {
1339       int len = data.getSerializedLength();
1340       // This cacheable thing can't be serialized...
1341       if (len == 0) return null;
1342       long offset = bucketAllocator.allocateBlock(len);
1343       BucketEntry bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory);
1344       bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap);
1345       try {
1346         if (data instanceof HFileBlock) {
1347           HFileBlock block = (HFileBlock) data;
1348           ByteBuff sliceBuf = block.getBufferReadOnlyWithHeader();
1349           sliceBuf.rewind();
1350           assert len == sliceBuf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE ||
1351             len == sliceBuf.limit() + block.headerSize() + HFileBlock.EXTRA_SERIALIZATION_SPACE;
1352           ByteBuffer extraInfoBuffer = ByteBuffer.allocate(HFileBlock.EXTRA_SERIALIZATION_SPACE);
1353           block.serializeExtraInfo(extraInfoBuffer);
1354           ioEngine.write(sliceBuf, offset);
1355           ioEngine.write(extraInfoBuffer, offset + len - HFileBlock.EXTRA_SERIALIZATION_SPACE);
1356         } else {
1357           ByteBuffer bb = ByteBuffer.allocate(len);
1358           data.serialize(bb);
1359           ioEngine.write(bb, offset);
1360         }
1361       } catch (IOException ioe) {
1362         // free it in bucket allocator
1363         bucketAllocator.freeBlock(offset);
1364         throw ioe;
1365       }
1366 
1367       realCacheSize.addAndGet(len);
1368       return bucketEntry;
1369     }
1370   }
1371 
1372   /**
1373    * Only used in test
1374    * @throws InterruptedException
1375    */
1376   void stopWriterThreads() throws InterruptedException {
1377     for (WriterThread writerThread : writerThreads) {
1378       writerThread.disableWriter();
1379       writerThread.interrupt();
1380       writerThread.join();
1381     }
1382   }
1383 
1384   @Override
1385   public Iterator<CachedBlock> iterator() {
1386     // Don't bother with ramcache since stuff is in here only a little while.
1387     final Iterator<Map.Entry<BlockCacheKey, BucketEntry>> i =
1388         this.backingMap.entrySet().iterator();
1389     return new Iterator<CachedBlock>() {
1390       private final long now = System.nanoTime();
1391 
1392       @Override
1393       public boolean hasNext() {
1394         return i.hasNext();
1395       }
1396 
1397       @Override
1398       public CachedBlock next() {
1399         final Map.Entry<BlockCacheKey, BucketEntry> e = i.next();
1400         return new CachedBlock() {
1401           @Override
1402           public String toString() {
1403             return BlockCacheUtil.toString(this, now);
1404           }
1405 
1406           @Override
1407           public BlockPriority getBlockPriority() {
1408             return e.getValue().getPriority();
1409           }
1410 
1411           @Override
1412           public BlockType getBlockType() {
1413             // Not held by BucketEntry.  Could add it if wanted on BucketEntry creation.
1414             return null;
1415           }
1416 
1417           @Override
1418           public long getOffset() {
1419             return e.getKey().getOffset();
1420           }
1421 
1422           @Override
1423           public long getSize() {
1424             return e.getValue().getLength();
1425           }
1426 
1427           @Override
1428           public long getCachedTime() {
1429             return e.getValue().getCachedTime();
1430           }
1431 
1432           @Override
1433           public String getFilename() {
1434             return e.getKey().getHfileName();
1435           }
1436 
1437           @Override
1438           public int compareTo(CachedBlock other) {
1439             int diff = this.getFilename().compareTo(other.getFilename());
1440             if (diff != 0) return diff;
1441             diff = (int)(this.getOffset() - other.getOffset());
1442             if (diff != 0) return diff;
1443             if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
1444               throw new IllegalStateException("" + this.getCachedTime() + ", " +
1445                 other.getCachedTime());
1446             }
1447             return (int)(other.getCachedTime() - this.getCachedTime());
1448           }
1449 
1450           @Override
1451           public int hashCode() {
1452             return e.getKey().hashCode();
1453           }
1454 
1455           @Override
1456           public boolean equals(Object obj) {
1457             if (obj instanceof CachedBlock) {
1458               CachedBlock cb = (CachedBlock)obj;
1459               return compareTo(cb) == 0;
1460             } else {
1461               return false;
1462             }
1463           }
1464         };
1465       }
1466 
1467       @Override
1468       public void remove() {
1469         throw new UnsupportedOperationException();
1470       }
1471     };
1472   }
1473 
1474   @Override
1475   public BlockCache[] getBlockCaches() {
1476     return null;
1477   }
1478 
1479   @Override
1480   public void returnBlock(BlockCacheKey cacheKey, Cacheable block) {
1481     if (block.getMemoryType() == MemoryType.SHARED) {
1482       BucketEntry bucketEntry = backingMap.get(cacheKey);
1483       if (bucketEntry != null) {
1484         int refCount = bucketEntry.refCount.decrementAndGet();
1485         if (bucketEntry.markedForEvict && refCount == 0) {
1486           forceEvict(cacheKey);
1487         }
1488       }
1489     }
1490   }
1491 
1492   @VisibleForTesting
1493   public int getRefCount(BlockCacheKey cacheKey) {
1494     BucketEntry bucketEntry = backingMap.get(cacheKey);
1495     if (bucketEntry != null) {
1496       return bucketEntry.refCount.get();
1497     }
1498     return 0;
1499   }
1500 }