View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15  
16   * distributed under the License is distributed on an "AS IS" BASIS,
17   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18   * See the License for the specific language governing permissions and
19   * limitations under the License.
20   */
21  package org.apache.hadoop.hbase.io.hfile.bucket;
22  
23  import java.io.File;
24  import java.io.FileInputStream;
25  import java.io.FileNotFoundException;
26  import java.io.FileOutputStream;
27  import java.io.IOException;
28  import java.io.ObjectInputStream;
29  import java.io.ObjectOutputStream;
30  import java.io.Serializable;
31  import java.nio.ByteBuffer;
32  import java.util.ArrayList;
33  import java.util.Comparator;
34  import java.util.Iterator;
35  import java.util.List;
36  import java.util.Map;
37  import java.util.NavigableSet;
38  import java.util.PriorityQueue;
39  import java.util.Set;
40  import java.util.concurrent.ArrayBlockingQueue;
41  import java.util.concurrent.BlockingQueue;
42  import java.util.concurrent.ConcurrentHashMap;
43  import java.util.concurrent.ConcurrentMap;
44  import java.util.concurrent.ConcurrentSkipListSet;
45  import java.util.concurrent.Executors;
46  import java.util.concurrent.ScheduledExecutorService;
47  import java.util.concurrent.TimeUnit;
48  import java.util.concurrent.atomic.AtomicInteger;
49  import java.util.concurrent.atomic.AtomicLong;
50  import java.util.concurrent.locks.Lock;
51  import java.util.concurrent.locks.ReentrantLock;
52  import java.util.concurrent.locks.ReentrantReadWriteLock;
53
54  import org.apache.commons.logging.Log;
55  import org.apache.commons.logging.LogFactory;
56  import org.apache.hadoop.hbase.classification.InterfaceAudience;
57  import org.apache.hadoop.hbase.io.HeapSize;
58  import org.apache.hadoop.hbase.io.hfile.BlockCache;
59  import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
60  import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil;
61  import org.apache.hadoop.hbase.io.hfile.BlockPriority;
62  import org.apache.hadoop.hbase.io.hfile.BlockType;
63  import org.apache.hadoop.hbase.io.hfile.CacheStats;
64  import org.apache.hadoop.hbase.io.hfile.Cacheable;
65  import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
66  import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
67  import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
68  import org.apache.hadoop.hbase.io.hfile.CachedBlock;
69  import org.apache.hadoop.hbase.io.hfile.HFileBlock;
70  import org.apache.hadoop.hbase.nio.ByteBuff;
71  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
72  import org.apache.hadoop.hbase.util.HasThread;
73  import org.apache.hadoop.hbase.util.IdReadWriteLock;
74  import org.apache.hadoop.util.StringUtils;
75
76  import com.google.common.annotations.VisibleForTesting;
77  import com.google.common.util.concurrent.ThreadFactoryBuilder;
78  
79  /**
80   * BucketCache uses {@link BucketAllocator} to allocate/free blocks, and uses
81   * BucketCache#ramCache and BucketCache#backingMap in order to
82   * determine if a given element is in the cache. The bucket cache can use on-heap or
83   * off-heap memory {@link ByteBufferIOEngine} or in a file {@link FileIOEngine} to
84   * store/read the block data.
85   *
86   * <p>Eviction is via a similar algorithm as used in
87   * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache}
88   *
89   * <p>BucketCache can be used as mainly a block cache (see
90   * {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with
91   * LruBlockCache to decrease CMS GC and heap fragmentation.
92   *
93   * <p>It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store
94   * blocks) to enlarge cache space via
95   * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache#setVictimCache}
96   */
97  @InterfaceAudience.Private
98  public class BucketCache implements BlockCache, HeapSize {
99    private static final Log LOG = LogFactory.getLog(BucketCache.class);
100 
101   /** Priority buckets */
102   private static final float DEFAULT_SINGLE_FACTOR = 0.25f;
103   private static final float DEFAULT_MULTI_FACTOR = 0.50f;
104   private static final float DEFAULT_MEMORY_FACTOR = 0.25f;
105   private static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f;
106 
107   private static final float DEFAULT_ACCEPT_FACTOR = 0.95f;
108   private static final float DEFAULT_MIN_FACTOR = 0.85f;
109 
110   /** Statistics thread */
111   private static final int statThreadPeriod = 5 * 60;
112 
113   final static int DEFAULT_WRITER_THREADS = 3;
114   final static int DEFAULT_WRITER_QUEUE_ITEMS = 64;
115 
116   // Store/read block data
117   final IOEngine ioEngine;
118 
119   // Store the block in this map before writing it to cache
120   @VisibleForTesting
121   final ConcurrentMap<BlockCacheKey, RAMQueueEntry> ramCache;
122   // In this map, store the block's meta data like offset, length
123   @VisibleForTesting
124   ConcurrentMap<BlockCacheKey, BucketEntry> backingMap;
125 
126   /**
127    * Flag if the cache is enabled or not... We shut it off if there are IO
128    * errors for some time, so that Bucket IO exceptions/errors don't bring down
129    * the HBase server.
130    */
131   private volatile boolean cacheEnabled;
132 
133   /**
134    * A list of writer queues.  We have a queue per {@link WriterThread} we have running.
135    * In other words, the work adding blocks to the BucketCache is divided up amongst the
136    * running WriterThreads.  Its done by taking hash of the cache key modulo queue count.
137    * WriterThread when it runs takes whatever has been recently added and 'drains' the entries
138    * to the BucketCache.  It then updates the ramCache and backingMap accordingly.
139    */
140   @VisibleForTesting
141   final ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues =
142       new ArrayList<BlockingQueue<RAMQueueEntry>>();
143   @VisibleForTesting
144   final WriterThread[] writerThreads;
145 
146   /** Volatile boolean to track if free space is in process or not */
147   private volatile boolean freeInProgress = false;
148   private final Lock freeSpaceLock = new ReentrantLock();
149 
150   private UniqueIndexMap<Integer> deserialiserMap = new UniqueIndexMap<Integer>();
151 
152   private final AtomicLong realCacheSize = new AtomicLong(0);
153   private final AtomicLong heapSize = new AtomicLong(0);
154   /** Current number of cached elements */
155   private final AtomicLong blockNumber = new AtomicLong(0);
156 
157   /** Cache access count (sequential ID) */
158   private final AtomicLong accessCount = new AtomicLong(0);
159 
160   private static final int DEFAULT_CACHE_WAIT_TIME = 50;
161   // Used in test now. If the flag is false and the cache speed is very fast,
162   // bucket cache will skip some blocks when caching. If the flag is true, we
163   // will wait blocks flushed to IOEngine for some time when caching
164   boolean wait_when_cache = false;
165 
166   private final BucketCacheStats cacheStats = new BucketCacheStats();
167 
168   private final String persistencePath;
169   private final long cacheCapacity;
170   /** Approximate block size */
171   private final long blockSize;
172 
173   /** Duration of IO errors tolerated before we disable cache, 1 min as default */
174   private final int ioErrorsTolerationDuration;
175   // 1 min
176   public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000;
177 
178   // Start time of first IO error when reading or writing IO Engine, it will be
179   // reset after a successful read/write.
180   private volatile long ioErrorStartTime = -1;
181 
182   /**
183    * A ReentrantReadWriteLock to lock on a particular block identified by offset.
184    * The purpose of this is to avoid freeing the block which is being read.
185    */
186   @VisibleForTesting
187   final IdReadWriteLock offsetLock = new IdReadWriteLock();
188 
189   private final NavigableSet<BlockCacheKey> blocksByHFile =
190       new ConcurrentSkipListSet<BlockCacheKey>(new Comparator<BlockCacheKey>() {
191         @Override
192         public int compare(BlockCacheKey a, BlockCacheKey b) {
193           int nameComparison = a.getHfileName().compareTo(b.getHfileName());
194           if (nameComparison != 0) {
195             return nameComparison;
196           }
197
198           if (a.getOffset() == b.getOffset()) {
199             return 0;
200           } else if (a.getOffset() < b.getOffset()) {
201             return -1;
202           }
203           return 1;
204         }
205       });
206
207   /** Statistics thread schedule pool (for heavy debugging, could remove) */
208   private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1,
209     new ThreadFactoryBuilder().setNameFormat("BucketCacheStatsExecutor").setDaemon(true).build());
210
211   // Allocate or free space for the block
212   private BucketAllocator bucketAllocator;
213
214   public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
215       int writerThreadNum, int writerQLen, String persistencePath) throws FileNotFoundException,
216       IOException {
217     this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
218       persistencePath, DEFAULT_ERROR_TOLERATION_DURATION);
219   }
220
221   public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
222       int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration)
223       throws FileNotFoundException, IOException {
224     this.ioEngine = getIOEngineFromName(ioEngineName, capacity);
225     this.writerThreads = new WriterThread[writerThreadNum];
226     long blockNumCapacity = capacity / blockSize;
227     if (blockNumCapacity >= Integer.MAX_VALUE) {
228       // Enough for about 32TB of cache!
229       throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now");
230     }
231 
232     this.cacheCapacity = capacity;
233     this.persistencePath = persistencePath;
234     this.blockSize = blockSize;
235     this.ioErrorsTolerationDuration = ioErrorsTolerationDuration;
236 
237     bucketAllocator = new BucketAllocator(capacity, bucketSizes);
238     for (int i = 0; i < writerThreads.length; ++i) {
239       writerQueues.add(new ArrayBlockingQueue<RAMQueueEntry>(writerQLen));
240     }
241 
242     assert writerQueues.size() == writerThreads.length;
243     this.ramCache = new ConcurrentHashMap<BlockCacheKey, RAMQueueEntry>();
244
245     this.backingMap = new ConcurrentHashMap<BlockCacheKey, BucketEntry>((int) blockNumCapacity);
246
247     if (ioEngine.isPersistent() && persistencePath != null) {
248       try {
249         retrieveFromFile(bucketSizes);
250       } catch (IOException ioex) {
251         LOG.error("Can't restore from file because of", ioex);
252       } catch (ClassNotFoundException cnfe) {
253         LOG.error("Can't restore from file in rebuild because can't deserialise",cnfe);
254         throw new RuntimeException(cnfe);
255       }
256     }
257     final String threadName = Thread.currentThread().getName();
258     this.cacheEnabled = true;
259     for (int i = 0; i < writerThreads.length; ++i) {
260       writerThreads[i] = new WriterThread(writerQueues.get(i));
261       writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i);
262       writerThreads[i].setDaemon(true);
263     }
264     startWriterThreads();
265
266     // Run the statistics thread periodically to print the cache statistics log
267     // TODO: Add means of turning this off.  Bit obnoxious running thread just to make a log
268     // every five minutes.
269     this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
270         statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
271     LOG.info("Started bucket cache; ioengine=" + ioEngineName +
272         ", capacity=" + StringUtils.byteDesc(capacity) +
273       ", blockSize=" + StringUtils.byteDesc(blockSize) + ", writerThreadNum=" +
274         writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath=" +
275       persistencePath + ", bucketAllocator=" + this.bucketAllocator.getClass().getName());
276   }
277
278   /**
279    * Called by the constructor to start the writer threads. Used by tests that need to override
280    * starting the threads.
281    */
282   @VisibleForTesting
283   protected void startWriterThreads() {
284     for (WriterThread thread : writerThreads) {
285       thread.start();
286     }
287   }
288 
289   @VisibleForTesting
290   boolean isCacheEnabled() {
291     return this.cacheEnabled;
292   }
293
294   public long getMaxSize() {
295     return this.cacheCapacity;
296   }
297
298   public String getIoEngine() {
299     return ioEngine.toString();
300   }
301
302   /**
303    * Get the IOEngine from the IO engine name
304    * @param ioEngineName
305    * @param capacity
306    * @return the IOEngine
307    * @throws IOException
308    */
309   private IOEngine getIOEngineFromName(String ioEngineName, long capacity)
310       throws IOException {
311     if (ioEngineName.startsWith("file:")) {
312       return new FileIOEngine(ioEngineName.substring(5), capacity);
313     } else if (ioEngineName.startsWith("offheap")) {
314       return new ByteBufferIOEngine(capacity, true);
315     } else if (ioEngineName.startsWith("heap")) {
316       return new ByteBufferIOEngine(capacity, false);
317     } else if (ioEngineName.startsWith("mmap:")) {
318       return new FileMmapEngine(ioEngineName.substring(5), capacity);
319     } else {
320       throw new IllegalArgumentException(
321           "Don't understand io engine name for cache - prefix with file:, heap or offheap");
322     }
323   }
324
325   /**
326    * Cache the block with the specified name and buffer.
327    * @param cacheKey block's cache key
328    * @param buf block buffer
329    */
330   @Override
331   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
332     cacheBlock(cacheKey, buf, false, false);
333   }
334
335   /**
336    * Cache the block with the specified name and buffer.
337    * @param cacheKey block's cache key
338    * @param cachedItem block buffer
339    * @param inMemory if block is in-memory
340    * @param cacheDataInL1
341    */
342   @Override
343   public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
344       final boolean cacheDataInL1) {
345     cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache);
346   }
347
348   /**
349    * Cache the block to ramCache
350    * @param cacheKey block's cache key
351    * @param cachedItem block buffer
352    * @param inMemory if block is in-memory
353    * @param wait if true, blocking wait when queue is full
354    */
355   public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
356       boolean wait) {
357     if (LOG.isTraceEnabled()) LOG.trace("Caching key=" + cacheKey + ", item=" + cachedItem);
358     if (!cacheEnabled) {
359       return;
360     }
361
362     if (backingMap.containsKey(cacheKey)) {
363       return;
364     }
365
366     /*
367      * Stuff the entry into the RAM cache so it can get drained to the persistent store
368      */
369     RAMQueueEntry re =
370         new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory);
371     if (ramCache.putIfAbsent(cacheKey, re) != null) {
372       return;
373     }
374     int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size();
375     BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum);
376     boolean successfulAddition = false;
377     if (wait) {
378       try {
379         successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS);
380       } catch (InterruptedException e) {
381         Thread.currentThread().interrupt();
382       }
383     } else {
384       successfulAddition = bq.offer(re);
385     }
386     if (!successfulAddition) {
387       ramCache.remove(cacheKey);
388       cacheStats.failInsert();
389     } else {
390       this.blockNumber.incrementAndGet();
391       this.heapSize.addAndGet(cachedItem.heapSize());
392       blocksByHFile.add(cacheKey);
393     }
394   }
395
396   /**
397    * Get the buffer of the block with the specified key.
398    * @param key block's cache key
399    * @param caching true if the caller caches blocks on cache misses
400    * @param repeat Whether this is a repeat lookup for the same block
401    * @param updateCacheMetrics Whether we should update cache metrics or not
402    * @return buffer of specified cache key, or null if not in cache
403    */
404   @Override
405   public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
406       boolean updateCacheMetrics) {
407     if (!cacheEnabled) {
408       return null;
409     }
410     RAMQueueEntry re = ramCache.get(key);
411     if (re != null) {
412       if (updateCacheMetrics) {
413         cacheStats.hit(caching, key.isPrimary(), key.getBlockType());
414       }
415       re.access(accessCount.incrementAndGet());
416       return re.getData();
417     }
418     BucketEntry bucketEntry = backingMap.get(key);
419     if (bucketEntry != null) {
420       long start = System.nanoTime();
421       ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset());
422       try {
423         lock.readLock().lock();
424         // We can not read here even if backingMap does contain the given key because its offset
425         // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check
426         // existence here.
427         if (bucketEntry.equals(backingMap.get(key))) {
428           // TODO : change this area - should be removed after server cells and
429           // 12295 are available
430           int len = bucketEntry.getLength();
431           if (LOG.isTraceEnabled()) {
432             LOG.trace("Read offset=" + bucketEntry.offset() + ", len=" + len);
433           }
434           Cacheable cachedBlock = ioEngine.read(bucketEntry.offset(), len,
435               bucketEntry.deserializerReference(this.deserialiserMap));
436           long timeTaken = System.nanoTime() - start;
437           if (updateCacheMetrics) {
438             cacheStats.hit(caching, key.isPrimary(), key.getBlockType());
439             cacheStats.ioHit(timeTaken);
440           }
441           if (cachedBlock.getMemoryType() == MemoryType.SHARED) {
442             bucketEntry.refCount.incrementAndGet();
443           }
444           bucketEntry.access(accessCount.incrementAndGet());
445           if (this.ioErrorStartTime > 0) {
446             ioErrorStartTime = -1;
447           }
448           return cachedBlock;
449         }
450       } catch (IOException ioex) {
451         LOG.error("Failed reading block " + key + " from bucket cache", ioex);
452         checkIOErrorIsTolerated();
453       } finally {
454         lock.readLock().unlock();
455       }
456     }
457     if (!repeat && updateCacheMetrics) {
458       cacheStats.miss(caching, key.isPrimary(), key.getBlockType());
459     }
460     return null;
461   }
462
463   @VisibleForTesting
464   void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber) {
465     bucketAllocator.freeBlock(bucketEntry.offset());
466     realCacheSize.addAndGet(-1 * bucketEntry.getLength());
467     blocksByHFile.remove(cacheKey);
468     if (decrementBlockNumber) {
469       this.blockNumber.decrementAndGet();
470     }
471   }
472 
473   @Override
474   public boolean evictBlock(BlockCacheKey cacheKey) {
475     return evictBlock(cacheKey, true);
476   }
477
478   // does not check for the ref count. Just tries to evict it if found in the
479   // bucket map
480   private boolean forceEvict(BlockCacheKey cacheKey) {
481     if (!cacheEnabled) {
482       return false;
483     }
484     RAMQueueEntry removedBlock = checkRamCache(cacheKey);
485     BucketEntry bucketEntry = backingMap.get(cacheKey);
486     if (bucketEntry == null) {
487       if (removedBlock != null) {
488         cacheStats.evicted(0, cacheKey.isPrimary());
489         return true;
490       } else {
491         return false;
492       }
493     }
494     ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset());
495     try {
496       lock.writeLock().lock();
497       if (backingMap.remove(cacheKey, bucketEntry)) {
498         blockEvicted(cacheKey, bucketEntry, removedBlock == null);
499       } else {
500         return false;
501       }
502     } finally {
503       lock.writeLock().unlock();
504     }
505     cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary());
506     return true;
507   }
508
509   private RAMQueueEntry checkRamCache(BlockCacheKey cacheKey) {
510     RAMQueueEntry removedBlock = ramCache.remove(cacheKey);
511     if (removedBlock != null) {
512       this.blockNumber.decrementAndGet();
513       this.heapSize.addAndGet(-1 * removedBlock.getData().heapSize());
514     }
515     return removedBlock;
516   }
517
518   public boolean evictBlock(BlockCacheKey cacheKey, boolean deletedBlock) {
519     if (!cacheEnabled) {
520       return false;
521     }
522     RAMQueueEntry removedBlock = checkRamCache(cacheKey);
523     BucketEntry bucketEntry = backingMap.get(cacheKey);
524     if (bucketEntry == null) {
525       if (removedBlock != null) {
526         cacheStats.evicted(0, cacheKey.isPrimary());
527         return true;
528       } else {
529         return false;
530       }
531     }
532     ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset());
533     try {
534       lock.writeLock().lock();
535       int refCount = bucketEntry.refCount.get();
536       if(refCount == 0) {
537         if (backingMap.remove(cacheKey, bucketEntry)) {
538           blockEvicted(cacheKey, bucketEntry, removedBlock == null);
539         } else {
540           return false;
541         }
542       } else {
543         if(!deletedBlock) {
544           if (LOG.isDebugEnabled()) {
545             LOG.debug("This block " + cacheKey + " is still referred by " + refCount
546                 + " readers. Can not be freed now");
547           }
548           return false;
549         } else {
550           if (LOG.isDebugEnabled()) {
551             LOG.debug("This block " + cacheKey + " is still referred by " + refCount
552                 + " readers. Can not be freed now. Hence will mark this"
553                 + " for evicting at a later point");
554           }
555           bucketEntry.markedForEvict = true;
556         }
557       }
558     } finally {
559       lock.writeLock().unlock();
560     }
561     cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary());
562     return true;
563   }
564
565   /*
566    * Statistics thread.  Periodically output cache statistics to the log.
567    */
568   private static class StatisticsThread extends Thread {
569     private final BucketCache bucketCache;
570
571     public StatisticsThread(BucketCache bucketCache) {
572       super("BucketCacheStatsThread");
573       setDaemon(true);
574       this.bucketCache = bucketCache;
575     }
576
577     @Override
578     public void run() {
579       bucketCache.logStats();
580     }
581   }
582
583   public void logStats() {
584     long totalSize = bucketAllocator.getTotalSize();
585     long usedSize = bucketAllocator.getUsedSize();
586     long freeSize = totalSize - usedSize;
587     long cacheSize = getRealCacheSize();
588     LOG.info("failedBlockAdditions=" + cacheStats.getFailedInserts() + ", " +
589         "totalSize=" + StringUtils.byteDesc(totalSize) + ", " +
590         "freeSize=" + StringUtils.byteDesc(freeSize) + ", " +
591         "usedSize=" + StringUtils.byteDesc(usedSize) +", " +
592         "cacheSize=" + StringUtils.byteDesc(cacheSize) +", " +
593         "accesses=" + cacheStats.getRequestCount() + ", " +
594         "hits=" + cacheStats.getHitCount() + ", " +
595         "IOhitsPerSecond=" + cacheStats.getIOHitsPerSecond() + ", " +
596         "IOTimePerHit=" + String.format("%.2f", cacheStats.getIOTimePerHit())+ ", " +
597         "hitRatio=" + (cacheStats.getHitCount() == 0 ? "0," :
598           (StringUtils.formatPercent(cacheStats.getHitRatio(), 2)+ ", ")) +
599         "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " +
600         "cachingHits=" + cacheStats.getHitCachingCount() + ", " +
601         "cachingHitsRatio=" +(cacheStats.getHitCachingCount() == 0 ? "0," :
602           (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2)+ ", ")) +
603         "evictions=" + cacheStats.getEvictionCount() + ", " +
604         "evicted=" + cacheStats.getEvictedCount() + ", " +
605         "evictedPerRun=" + cacheStats.evictedPerEviction());
606     cacheStats.reset();
607   }
608
609   public long getRealCacheSize() {
610     return this.realCacheSize.get();
611   }
612
613   private long acceptableSize() {
614     return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_ACCEPT_FACTOR);
615   }
616 
617   private long singleSize() {
618     return (long) Math.floor(bucketAllocator.getTotalSize()
619         * DEFAULT_SINGLE_FACTOR * DEFAULT_MIN_FACTOR);
620   }
621 
622   private long multiSize() {
623     return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_MULTI_FACTOR
624         * DEFAULT_MIN_FACTOR);
625   }
626 
627   private long memorySize() {
628     return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_MEMORY_FACTOR
629         * DEFAULT_MIN_FACTOR);
630   }
631
632   /**
633    * Free the space if the used size reaches acceptableSize() or one size block
634    * couldn't be allocated. When freeing the space, we use the LRU algorithm and
635    * ensure there must be some blocks evicted
636    * @param why Why we are being called
637    */
638   private void freeSpace(final String why) {
639     // Ensure only one freeSpace progress at a time
640     if (!freeSpaceLock.tryLock()) {
641       return;
642     }
643     try {
644       freeInProgress = true;
645       long bytesToFreeWithoutExtra = 0;
646       // Calculate free byte for each bucketSizeinfo
647       StringBuffer msgBuffer = LOG.isDebugEnabled()? new StringBuffer(): null;
648       BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics();
649       long[] bytesToFreeForBucket = new long[stats.length];
650       for (int i = 0; i < stats.length; i++) {
651         bytesToFreeForBucket[i] = 0;
652         long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - DEFAULT_MIN_FACTOR));
653         freeGoal = Math.max(freeGoal, 1);
654         if (stats[i].freeCount() < freeGoal) {
655           bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount());
656           bytesToFreeWithoutExtra += bytesToFreeForBucket[i];
657           if (msgBuffer != null) {
658             msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")="
659               + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", ");
660           }
661         }
662       }
663       if (msgBuffer != null) {
664         msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", ");
665       }
666
667       if (bytesToFreeWithoutExtra <= 0) {
668         return;
669       }
670       long currentSize = bucketAllocator.getUsedSize();
671       long totalSize = bucketAllocator.getTotalSize();
672       if (LOG.isDebugEnabled() && msgBuffer != null) {
673         LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString() +
674           " of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" +
675           StringUtils.byteDesc(realCacheSize.get()) + ", total=" + StringUtils.byteDesc(totalSize));
676       }
677
678       long bytesToFreeWithExtra = (long) Math.floor(bytesToFreeWithoutExtra
679           * (1 + DEFAULT_EXTRA_FREE_FACTOR));
680
681       // Instantiate priority buckets
682       BucketEntryGroup bucketSingle = new BucketEntryGroup(bytesToFreeWithExtra,
683           blockSize, singleSize());
684       BucketEntryGroup bucketMulti = new BucketEntryGroup(bytesToFreeWithExtra,
685           blockSize, multiSize());
686       BucketEntryGroup bucketMemory = new BucketEntryGroup(bytesToFreeWithExtra,
687           blockSize, memorySize());
688
689       // Scan entire map putting bucket entry into appropriate bucket entry
690       // group
691       for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) {
692         switch (bucketEntryWithKey.getValue().getPriority()) {
693           case SINGLE: {
694             bucketSingle.add(bucketEntryWithKey);
695             break;
696           }
697           case MULTI: {
698             bucketMulti.add(bucketEntryWithKey);
699             break;
700           }
701           case MEMORY: {
702             bucketMemory.add(bucketEntryWithKey);
703             break;
704           }
705         }
706       }
707
708       PriorityQueue<BucketEntryGroup> bucketQueue = new PriorityQueue<BucketEntryGroup>(3);
709
710       bucketQueue.add(bucketSingle);
711       bucketQueue.add(bucketMulti);
712       bucketQueue.add(bucketMemory);
713
714       int remainingBuckets = 3;
715       long bytesFreed = 0;
716
717       BucketEntryGroup bucketGroup;
718       while ((bucketGroup = bucketQueue.poll()) != null) {
719         long overflow = bucketGroup.overflow();
720         if (overflow > 0) {
721           long bucketBytesToFree = Math.min(overflow,
722               (bytesToFreeWithoutExtra - bytesFreed) / remainingBuckets);
723           bytesFreed += bucketGroup.free(bucketBytesToFree);
724         }
725         remainingBuckets--;
726       }
727
728       /**
729        * Check whether need extra free because some bucketSizeinfo still needs
730        * free space
731        */
732       stats = bucketAllocator.getIndexStatistics();
733       boolean needFreeForExtra = false;
734       for (int i = 0; i < stats.length; i++) {
735         long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - DEFAULT_MIN_FACTOR));
736         freeGoal = Math.max(freeGoal, 1);
737         if (stats[i].freeCount() < freeGoal) {
738           needFreeForExtra = true;
739           break;
740         }
741       }
742
743       if (needFreeForExtra) {
744         bucketQueue.clear();
745         remainingBuckets = 2;
746
747         bucketQueue.add(bucketSingle);
748         bucketQueue.add(bucketMulti);
749
750         while ((bucketGroup = bucketQueue.poll()) != null) {
751           long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets;
752           bytesFreed += bucketGroup.free(bucketBytesToFree);
753           remainingBuckets--;
754         }
755       }
756
757       if (LOG.isDebugEnabled()) {
758         long single = bucketSingle.totalSize();
759         long multi = bucketMulti.totalSize();
760         long memory = bucketMemory.totalSize();
761         if (LOG.isDebugEnabled()) {
762           LOG.debug("Bucket cache free space completed; " + "freed="
763             + StringUtils.byteDesc(bytesFreed) + ", " + "total="
764             + StringUtils.byteDesc(totalSize) + ", " + "single="
765             + StringUtils.byteDesc(single) + ", " + "multi="
766             + StringUtils.byteDesc(multi) + ", " + "memory="
767             + StringUtils.byteDesc(memory));
768         }
769       }
770
771     } catch (Throwable t) {
772       LOG.warn("Failed freeing space", t);
773     } finally {
774       cacheStats.evict();
775       freeInProgress = false;
776       freeSpaceLock.unlock();
777     }
778   }
779
780   // This handles flushing the RAM cache to IOEngine.
781   @VisibleForTesting
782   class WriterThread extends HasThread {
783     private final BlockingQueue<RAMQueueEntry> inputQueue;
784     private volatile boolean writerEnabled = true;
785
786     WriterThread(BlockingQueue<RAMQueueEntry> queue) {
787       this.inputQueue = queue;
788     }
789
790     // Used for test
791     @VisibleForTesting
792     void disableWriter() {
793       this.writerEnabled = false;
794     }
795
796     public void run() {
797       List<RAMQueueEntry> entries = new ArrayList<RAMQueueEntry>();
798       try {
799         while (cacheEnabled && writerEnabled) {
800           try {
801             try {
802               // Blocks
803               entries = getRAMQueueEntries(inputQueue, entries);
804             } catch (InterruptedException ie) {
805               if (!cacheEnabled) break;
806             }
807             doDrain(entries);
808           } catch (Exception ioe) {
809             LOG.error("WriterThread encountered error", ioe);
810           }
811         }
812       } catch (Throwable t) {
813         LOG.warn("Failed doing drain", t);
814       }
815       LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled);
816     }
817
818     /**
819      * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap.
820      * Process all that are passed in even if failure being sure to remove from ramCache else we'll
821      * never undo the references and we'll OOME.
822      * @param entries Presumes list passed in here will be processed by this invocation only. No
823      *   interference expected.
824      * @throws InterruptedException
825      */
826     @VisibleForTesting
827     void doDrain(final List<RAMQueueEntry> entries) throws InterruptedException {
828       if (entries.isEmpty()) {
829         return;
830       }
831       // This method is a little hard to follow. We run through the passed in entries and for each
832       // successful add, we add a non-null BucketEntry to the below bucketEntries.  Later we must
833       // do cleanup making sure we've cleared ramCache of all entries regardless of whether we
834       // successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by
835       // filling ramCache.  We do the clean up by again running through the passed in entries
836       // doing extra work when we find a non-null bucketEntries corresponding entry.
837       final int size = entries.size();
838       BucketEntry[] bucketEntries = new BucketEntry[size];
839       // Index updated inside loop if success or if we can't succeed. We retry if cache is full
840       // when we go to add an entry by going around the loop again without upping the index.
841       int index = 0;
842       while (cacheEnabled && index < size) {
843         RAMQueueEntry re = null;
844         try {
845           re = entries.get(index);
846           if (re == null) {
847             LOG.warn("Couldn't get entry or changed on us; who else is messing with it?");
848             index++;
849             continue;
850           }
851           BucketEntry bucketEntry =
852             re.writeToCache(ioEngine, bucketAllocator, deserialiserMap, realCacheSize);
853           // Successfully added.  Up index and add bucketEntry. Clear io exceptions.
854           bucketEntries[index] = bucketEntry;
855           if (ioErrorStartTime > 0) {
856             ioErrorStartTime = -1;
857           }
858           index++;
859         } catch (BucketAllocatorException fle) {
860           LOG.warn("Failed allocation for " + (re == null ? "" : re.getKey()) + "; " + fle);
861           // Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below.
862           bucketEntries[index] = null;
863           index++;
864         } catch (CacheFullException cfe) {
865           // Cache full when we tried to add. Try freeing space and then retrying (don't up index)
866           if (!freeInProgress) {
867             freeSpace("Full!");
868           } else {
869             Thread.sleep(50);
870           }
871         } catch (IOException ioex) {
872           // Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem.
873           LOG.error("Failed writing to bucket cache", ioex);
874           checkIOErrorIsTolerated();
875         }
876       }
877
878       // Make sure data pages are written on media before we update maps.
879       try {
880         ioEngine.sync();
881       } catch (IOException ioex) {
882         LOG.error("Failed syncing IO engine", ioex);
883         checkIOErrorIsTolerated();
884         // Since we failed sync, free the blocks in bucket allocator
885         for (int i = 0; i < entries.size(); ++i) {
886           if (bucketEntries[i] != null) {
887             bucketAllocator.freeBlock(bucketEntries[i].offset());
888             bucketEntries[i] = null;
889           }
890         }
891       }
892
893       // Now add to backingMap if successfully added to bucket cache.  Remove from ramCache if
894       // success or error.
895       for (int i = 0; i < size; ++i) {
896         BlockCacheKey key = entries.get(i).getKey();
897         // Only add if non-null entry.
898         if (bucketEntries[i] != null) {
899           backingMap.put(key, bucketEntries[i]);
900         }
901         // Always remove from ramCache even if we failed adding it to the block cache above.
902         RAMQueueEntry ramCacheEntry = ramCache.remove(key);
903         if (ramCacheEntry != null) {
904           heapSize.addAndGet(-1 * entries.get(i).getData().heapSize());
905         } else if (bucketEntries[i] != null){
906           // Block should have already been evicted. Remove it and free space.
907           ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntries[i].offset());
908           try {
909             lock.writeLock().lock();
910             if (backingMap.remove(key, bucketEntries[i])) {
911               blockEvicted(key, bucketEntries[i], false);
912             }
913           } finally {
914             lock.writeLock().unlock();
915           }
916         }
917       }
918
919       long used = bucketAllocator.getUsedSize();
920       if (used > acceptableSize()) {
921         freeSpace("Used=" + used + " > acceptable=" + acceptableSize());
922       }
923       return;
924     }
925   }
926
927   /**
928    * Blocks until elements available in {@code q} then tries to grab as many as possible
929    * before returning.
930    * @param receptacle Where to stash the elements taken from queue. We clear before we use it
931    *     just in case.
932    * @param q The queue to take from.
933    * @return {@code receptacle} laden with elements taken from the queue or empty if none found.
934    */
935   @VisibleForTesting
936   static List<RAMQueueEntry> getRAMQueueEntries(final BlockingQueue<RAMQueueEntry> q,
937       final List<RAMQueueEntry> receptacle)
938   throws InterruptedException {
939     // Clear sets all entries to null and sets size to 0. We retain allocations. Presume it
940     // ok even if list grew to accommodate thousands.
941     receptacle.clear();
942     receptacle.add(q.take());
943     q.drainTo(receptacle);
944     return receptacle;
945   }
946
947   private void persistToFile() throws IOException {
948     assert !cacheEnabled;
949     FileOutputStream fos = null;
950     ObjectOutputStream oos = null;
951     try {
952       if (!ioEngine.isPersistent()) {
953         throw new IOException("Attempt to persist non-persistent cache mappings!");
954       }
955       fos = new FileOutputStream(persistencePath, false);
956       oos = new ObjectOutputStream(fos);
957       oos.writeLong(cacheCapacity);
958       oos.writeUTF(ioEngine.getClass().getName());
959       oos.writeUTF(backingMap.getClass().getName());
960       oos.writeObject(deserialiserMap);
961       oos.writeObject(backingMap);
962     } finally {
963       if (oos != null) oos.close();
964       if (fos != null) fos.close();
965     }
966   }
967
968   @SuppressWarnings("unchecked")
969   private void retrieveFromFile(int[] bucketSizes) throws IOException, BucketAllocatorException,
970       ClassNotFoundException {
971     File persistenceFile = new File(persistencePath);
972     if (!persistenceFile.exists()) {
973       return;
974     }
975     assert !cacheEnabled;
976     FileInputStream fis = null;
977     ObjectInputStream ois = null;
978     try {
979       if (!ioEngine.isPersistent())
980         throw new IOException(
981             "Attempt to restore non-persistent cache mappings!");
982       fis = new FileInputStream(persistencePath);
983       ois = new ObjectInputStream(fis);
984       long capacitySize = ois.readLong();
985       if (capacitySize != cacheCapacity)
986         throw new IOException("Mismatched cache capacity:"
987             + StringUtils.byteDesc(capacitySize) + ", expected: "
988             + StringUtils.byteDesc(cacheCapacity));
989       String ioclass = ois.readUTF();
990       String mapclass = ois.readUTF();
991       if (!ioEngine.getClass().getName().equals(ioclass))
992         throw new IOException("Class name for IO engine mismatch: " + ioclass
993             + ", expected:" + ioEngine.getClass().getName());
994       if (!backingMap.getClass().getName().equals(mapclass))
995         throw new IOException("Class name for cache map mismatch: " + mapclass
996             + ", expected:" + backingMap.getClass().getName());
997       UniqueIndexMap<Integer> deserMap = (UniqueIndexMap<Integer>) ois
998           .readObject();
999       ConcurrentHashMap<BlockCacheKey, BucketEntry> backingMapFromFile =
1000           (ConcurrentHashMap<BlockCacheKey, BucketEntry>) ois.readObject();
1001       BucketAllocator allocator = new BucketAllocator(cacheCapacity, bucketSizes,
1002           backingMapFromFile, realCacheSize);
1003       bucketAllocator = allocator;
1004       deserialiserMap = deserMap;
1005       backingMap = backingMapFromFile;
1006     } finally {
1007       if (ois != null) ois.close();
1008       if (fis != null) fis.close();
1009       if (!persistenceFile.delete()) {
1010         throw new IOException("Failed deleting persistence file "
1011             + persistenceFile.getAbsolutePath());
1012       }
1013     }
1014   }
1015
1016   /**
1017    * Check whether we tolerate IO error this time. If the duration of IOEngine
1018    * throwing errors exceeds ioErrorsDurationTimeTolerated, we will disable the
1019    * cache
1020    */
1021   private void checkIOErrorIsTolerated() {
1022     long now = EnvironmentEdgeManager.currentTime();
1023     if (this.ioErrorStartTime > 0) {
1024       if (cacheEnabled && (now - ioErrorStartTime) > this.ioErrorsTolerationDuration) {
1025         LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration +
1026           "ms, disabing cache, please check your IOEngine");
1027         disableCache();
1028       }
1029     } else {
1030       this.ioErrorStartTime = now;
1031     }
1032   }
1033
1034   /**
1035    * Used to shut down the cache -or- turn it off in the case of something broken.
1036    */
1037   private void disableCache() {
1038     if (!cacheEnabled) return;
1039     cacheEnabled = false;
1040     ioEngine.shutdown();
1041     this.scheduleThreadPool.shutdown();
1042     for (int i = 0; i < writerThreads.length; ++i) writerThreads[i].interrupt();
1043     this.ramCache.clear();
1044     if (!ioEngine.isPersistent() || persistencePath == null) {
1045       // If persistent ioengine and a path, we will serialize out the backingMap.
1046       this.backingMap.clear();
1047     }
1048   }
1049
1050   private void join() throws InterruptedException {
1051     for (int i = 0; i < writerThreads.length; ++i)
1052       writerThreads[i].join();
1053   }
1054
1055   @Override
1056   public void shutdown() {
1057     disableCache();
1058     LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent()
1059         + "; path to write=" + persistencePath);
1060     if (ioEngine.isPersistent() && persistencePath != null) {
1061       try {
1062         join();
1063         persistToFile();
1064       } catch (IOException ex) {
1065         LOG.error("Unable to persist data on exit: " + ex.toString(), ex);
1066       } catch (InterruptedException e) {
1067         LOG.warn("Failed to persist data on exit", e);
1068       }
1069     }
1070   }
1071
1072   @Override
1073   public CacheStats getStats() {
1074     return cacheStats;
1075   }
1076
1077   public BucketAllocator getAllocator() {
1078     return this.bucketAllocator;
1079   }
1080
1081   @Override
1082   public long heapSize() {
1083     return this.heapSize.get();
1084   }
1085
1086   @Override
1087   public long size() {
1088     return this.realCacheSize.get();
1089   }
1090
1091   @Override
1092   public long getFreeSize() {
1093     return this.bucketAllocator.getFreeSize();
1094   }
1095
1096   @Override
1097   public long getBlockCount() {
1098     return this.blockNumber.get();
1099   }
1100
1101   @Override
1102   public long getCurrentSize() {
1103     return this.bucketAllocator.getUsedSize();
1104   }
1105
1106   /**
1107    * Evicts all blocks for a specific HFile.
1108    * <p>
1109    * This is used for evict-on-close to remove all blocks of a specific HFile.
1110    *
1111    * @return the number of blocks evicted
1112    */
1113   @Override
1114   public int evictBlocksByHfileName(String hfileName) {
1115     Set<BlockCacheKey> keySet = blocksByHFile.subSet(
1116         new BlockCacheKey(hfileName, Long.MIN_VALUE), true,
1117         new BlockCacheKey(hfileName, Long.MAX_VALUE), true);
1118
1119     int numEvicted = 0;
1120     for (BlockCacheKey key : keySet) {
1121       if (evictBlock(key)) {
1122           ++numEvicted;
1123       }
1124     }
1125
1126     return numEvicted;
1127   }
1128
1129   /**
1130    * Item in cache. We expect this to be where most memory goes. Java uses 8
1131    * bytes just for object headers; after this, we want to use as little as
1132    * possible - so we only use 8 bytes, but in order to do so we end up messing
1133    * around with all this Java casting stuff. Offset stored as 5 bytes that make
1134    * up the long. Doubt we'll see devices this big for ages. Offsets are divided
1135    * by 256. So 5 bytes gives us 256TB or so.
1136    */
1137   static class BucketEntry implements Serializable {
1138     private static final long serialVersionUID = -6741504807982257534L;
1139
1140     // access counter comparator, descending order
1141     static final Comparator<BucketEntry> COMPARATOR = new Comparator<BucketCache.BucketEntry>() {
1142
1143       @Override
1144       public int compare(BucketEntry o1, BucketEntry o2) {
1145         return Long.compare(o2.accessCounter, o1.accessCounter);
1146       }
1147     };
1148
1149     private int offsetBase;
1150     private int length;
1151     private byte offset1;
1152     byte deserialiserIndex;
1153     private volatile long accessCounter;
1154     private BlockPriority priority;
1155     // Set this when we were not able to forcefully evict the block
1156     private volatile boolean markedForEvict;
1157     private AtomicInteger refCount = new AtomicInteger(0);
1158
1159     /**
1160      * Time this block was cached.  Presumes we are created just before we are added to the cache.
1161      */
1162     private final long cachedTime = System.nanoTime();
1163
1164     BucketEntry(long offset, int length, long accessCounter, boolean inMemory) {
1165       setOffset(offset);
1166       this.length = length;
1167       this.accessCounter = accessCounter;
1168       if (inMemory) {
1169         this.priority = BlockPriority.MEMORY;
1170       } else {
1171         this.priority = BlockPriority.SINGLE;
1172       }
1173     }
1174
1175     long offset() { // Java has no unsigned numbers
1176       long o = ((long) offsetBase) & 0xFFFFFFFF;
1177       o += (((long) (offset1)) & 0xFF) << 32;
1178       return o << 8;
1179     }
1180
1181     private void setOffset(long value) {
1182       assert (value & 0xFF) == 0;
1183       value >>= 8;
1184       offsetBase = (int) value;
1185       offset1 = (byte) (value >> 32);
1186     }
1187
1188     public int getLength() {
1189       return length;
1190     }
1191
1192     protected CacheableDeserializer<Cacheable> deserializerReference(
1193         UniqueIndexMap<Integer> deserialiserMap) {
1194       return CacheableDeserializerIdManager.getDeserializer(deserialiserMap
1195           .unmap(deserialiserIndex));
1196     }
1197
1198     protected void setDeserialiserReference(
1199         CacheableDeserializer<Cacheable> deserializer,
1200         UniqueIndexMap<Integer> deserialiserMap) {
1201       this.deserialiserIndex = ((byte) deserialiserMap.map(deserializer
1202           .getDeserialiserIdentifier()));
1203     }
1204
1205     /**
1206      * Block has been accessed. Update its local access counter.
1207      */
1208     public void access(long accessCounter) {
1209       this.accessCounter = accessCounter;
1210       if (this.priority == BlockPriority.SINGLE) {
1211         this.priority = BlockPriority.MULTI;
1212       }
1213     }
1214
1215     public BlockPriority getPriority() {
1216       return this.priority;
1217     }
1218
1219     public long getCachedTime() {
1220       return cachedTime;
1221     }
1222   }
1223
1224   /**
1225    * Used to group bucket entries into priority buckets. There will be a
1226    * BucketEntryGroup for each priority (single, multi, memory). Once bucketed,
1227    * the eviction algorithm takes the appropriate number of elements out of each
1228    * according to configuration parameters and their relative sizes.
1229    */
1230   private class BucketEntryGroup implements Comparable<BucketEntryGroup> {
1231
1232     private CachedEntryQueue queue;
1233     private long totalSize = 0;
1234     private long bucketSize;
1235
1236     public BucketEntryGroup(long bytesToFree, long blockSize, long bucketSize) {
1237       this.bucketSize = bucketSize;
1238       queue = new CachedEntryQueue(bytesToFree, blockSize);
1239       totalSize = 0;
1240     }
1241
1242     public void add(Map.Entry<BlockCacheKey, BucketEntry> block) {
1243       totalSize += block.getValue().getLength();
1244       queue.add(block);
1245     }
1246
1247     public long free(long toFree) {
1248       Map.Entry<BlockCacheKey, BucketEntry> entry;
1249       long freedBytes = 0;
1250       // TODO avoid a cycling siutation. We find no block which is not in use and so no way to free
1251       // What to do then? Caching attempt fail? Need some changes in cacheBlock API?
1252       while ((entry = queue.pollLast()) != null) {
1253         if (evictBlock(entry.getKey(), false)) {
1254           freedBytes += entry.getValue().getLength();
1255         }
1256         if (freedBytes >= toFree) {
1257           return freedBytes;
1258         }
1259       }
1260       return freedBytes;
1261     }
1262
1263     public long overflow() {
1264       return totalSize - bucketSize;
1265     }
1266
1267     public long totalSize() {
1268       return totalSize;
1269     }
1270
1271     @Override
1272     public int compareTo(BucketEntryGroup that) {
1273       return Long.compare(this.overflow(), that.overflow());
1274     }
1275
1276     @Override
1277     public boolean equals(Object that) {
1278       return this == that;
1279     }
1280
1281   }
1282
1283   /**
1284    * Block Entry stored in the memory with key,data and so on
1285    */
1286   @VisibleForTesting
1287   static class RAMQueueEntry {
1288     private BlockCacheKey key;
1289     private Cacheable data;
1290     private long accessCounter;
1291     private boolean inMemory;
1292
1293     public RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter,
1294         boolean inMemory) {
1295       this.key = bck;
1296       this.data = data;
1297       this.accessCounter = accessCounter;
1298       this.inMemory = inMemory;
1299     }
1300
1301     public Cacheable getData() {
1302       return data;
1303     }
1304
1305     public BlockCacheKey getKey() {
1306       return key;
1307     }
1308
1309     public void access(long accessCounter) {
1310       this.accessCounter = accessCounter;
1311     }
1312
1313     public BucketEntry writeToCache(final IOEngine ioEngine,
1314         final BucketAllocator bucketAllocator,
1315         final UniqueIndexMap<Integer> deserialiserMap,
1316         final AtomicLong realCacheSize) throws CacheFullException, IOException,
1317         BucketAllocatorException {
1318       int len = data.getSerializedLength();
1319       // This cacheable thing can't be serialized
1320       if (len == 0) return null;
1321       long offset = bucketAllocator.allocateBlock(len);
1322       BucketEntry bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory);
1323       bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap);
1324       try {
1325         if (data instanceof HFileBlock) {
1326           // If an instance of HFileBlock, save on some allocations.
1327           HFileBlock block = (HFileBlock)data;
1328           ByteBuff sliceBuf = block.getBufferReadOnly();
1329           ByteBuffer metadata = block.getMetaData();
1330           if (LOG.isTraceEnabled()) {
1331             LOG.trace("Write offset=" + offset + ", len=" + len);
1332           }
1333           ioEngine.write(sliceBuf, offset);
1334           ioEngine.write(metadata, offset + len - metadata.limit());
1335         } else {
1336           ByteBuffer bb = ByteBuffer.allocate(len);
1337           data.serialize(bb);
1338           ioEngine.write(bb, offset);
1339         }
1340       } catch (IOException ioe) {
1341         // free it in bucket allocator
1342         bucketAllocator.freeBlock(offset);
1343         throw ioe;
1344       }
1345
1346       realCacheSize.addAndGet(len);
1347       return bucketEntry;
1348     }
1349   }
1350
1351   /**
1352    * Only used in test
1353    * @throws InterruptedException
1354    */
1355   void stopWriterThreads() throws InterruptedException {
1356     for (WriterThread writerThread : writerThreads) {
1357       writerThread.disableWriter();
1358       writerThread.interrupt();
1359       writerThread.join();
1360     }
1361   }
1362
1363   @Override
1364   public Iterator<CachedBlock> iterator() {
1365     // Don't bother with ramcache since stuff is in here only a little while.
1366     final Iterator<Map.Entry<BlockCacheKey, BucketEntry>> i =
1367         this.backingMap.entrySet().iterator();
1368     return new Iterator<CachedBlock>() {
1369       private final long now = System.nanoTime();
1370
1371       @Override
1372       public boolean hasNext() {
1373         return i.hasNext();
1374       }
1375
1376       @Override
1377       public CachedBlock next() {
1378         final Map.Entry<BlockCacheKey, BucketEntry> e = i.next();
1379         return new CachedBlock() {
1380           @Override
1381           public String toString() {
1382             return BlockCacheUtil.toString(this, now);
1383           }
1384
1385           @Override
1386           public BlockPriority getBlockPriority() {
1387             return e.getValue().getPriority();
1388           }
1389
1390           @Override
1391           public BlockType getBlockType() {
1392             // Not held by BucketEntry.  Could add it if wanted on BucketEntry creation.
1393             return null;
1394           }
1395
1396           @Override
1397           public long getOffset() {
1398             return e.getKey().getOffset();
1399           }
1400
1401           @Override
1402           public long getSize() {
1403             return e.getValue().getLength();
1404           }
1405
1406           @Override
1407           public long getCachedTime() {
1408             return e.getValue().getCachedTime();
1409           }
1410
1411           @Override
1412           public String getFilename() {
1413             return e.getKey().getHfileName();
1414           }
1415
1416           @Override
1417           public int compareTo(CachedBlock other) {
1418             int diff = this.getFilename().compareTo(other.getFilename());
1419             if (diff != 0) return diff;
1420
1421             diff = Long.compare(this.getOffset(), other.getOffset());
1422             if (diff != 0) return diff;
1423             if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
1424               throw new IllegalStateException("" + this.getCachedTime() + ", " +
1425                 other.getCachedTime());
1426             }
1427             return Long.compare(other.getCachedTime(), this.getCachedTime());
1428           }
1429
1430           @Override
1431           public int hashCode() {
1432             return e.getKey().hashCode();
1433           }
1434
1435           @Override
1436           public boolean equals(Object obj) {
1437             if (obj instanceof CachedBlock) {
1438               CachedBlock cb = (CachedBlock)obj;
1439               return compareTo(cb) == 0;
1440             } else {
1441               return false;
1442             }
1443           }
1444         };
1445       }
1446
1447       @Override
1448       public void remove() {
1449         throw new UnsupportedOperationException();
1450       }
1451     };
1452   }
1453
1454   @Override
1455   public BlockCache[] getBlockCaches() {
1456     return null;
1457   }
1458
1459   @Override
1460   public void returnBlock(BlockCacheKey cacheKey, Cacheable block) {
1461     if (block.getMemoryType() == MemoryType.SHARED) {
1462       BucketEntry bucketEntry = backingMap.get(cacheKey);
1463       if (bucketEntry != null) {
1464         int refCount = bucketEntry.refCount.decrementAndGet();
1465         if (bucketEntry.markedForEvict && refCount == 0) {
1466           forceEvict(cacheKey);
1467         }
1468       }
1469     }
1470   }
1471
1472   @VisibleForTesting
1473   public int getRefCount(BlockCacheKey cacheKey) {
1474     BucketEntry bucketEntry = backingMap.get(cacheKey);
1475     if (bucketEntry != null) {
1476       return bucketEntry.refCount.get();
1477     }
1478     return 0;
1479   }
1480 }