View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15  
16   * distributed under the License is distributed on an "AS IS" BASIS,
17   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18   * See the License for the specific language governing permissions and
19   * limitations under the License.
20   */
21  package org.apache.hadoop.hbase.io.hfile.bucket;
22  
23  import java.io.File;
24  import java.io.FileInputStream;
25  import java.io.FileNotFoundException;
26  import java.io.FileOutputStream;
27  import java.io.IOException;
28  import java.io.ObjectInputStream;
29  import java.io.ObjectOutputStream;
30  import java.io.Serializable;
31  import java.nio.ByteBuffer;
32  import java.util.ArrayList;
33  import java.util.Comparator;
34  import java.util.Iterator;
35  import java.util.List;
36  import java.util.Map;
37  import java.util.NavigableSet;
38  import java.util.PriorityQueue;
39  import java.util.Set;
40  import java.util.concurrent.ArrayBlockingQueue;
41  import java.util.concurrent.BlockingQueue;
42  import java.util.concurrent.ConcurrentHashMap;
43  import java.util.concurrent.ConcurrentMap;
44  import java.util.concurrent.ConcurrentSkipListSet;
45  import java.util.concurrent.Executors;
46  import java.util.concurrent.ScheduledExecutorService;
47  import java.util.concurrent.TimeUnit;
48  import java.util.concurrent.atomic.AtomicInteger;
49  import java.util.concurrent.atomic.AtomicLong;
50  import java.util.concurrent.locks.Lock;
51  import java.util.concurrent.locks.ReentrantLock;
52  import java.util.concurrent.locks.ReentrantReadWriteLock;
53
54  import org.apache.commons.logging.Log;
55  import org.apache.commons.logging.LogFactory;
56  import org.apache.hadoop.hbase.classification.InterfaceAudience;
57  import org.apache.hadoop.hbase.io.HeapSize;
58  import org.apache.hadoop.hbase.io.hfile.BlockCache;
59  import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
60  import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil;
61  import org.apache.hadoop.hbase.io.hfile.BlockPriority;
62  import org.apache.hadoop.hbase.io.hfile.BlockType;
63  import org.apache.hadoop.hbase.io.hfile.CacheStats;
64  import org.apache.hadoop.hbase.io.hfile.Cacheable;
65  import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
66  import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
67  import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
68  import org.apache.hadoop.hbase.io.hfile.CachedBlock;
69  import org.apache.hadoop.hbase.io.hfile.HFileBlock;
70  import org.apache.hadoop.hbase.nio.ByteBuff;
71  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
72  import org.apache.hadoop.hbase.util.HasThread;
73  import org.apache.hadoop.hbase.util.IdReadWriteLock;
74  import org.apache.hadoop.util.StringUtils;
75
76  import com.google.common.annotations.VisibleForTesting;
77  import com.google.common.util.concurrent.ThreadFactoryBuilder;
78  
79  /**
80   * BucketCache uses {@link BucketAllocator} to allocate/free blocks, and uses
81   * BucketCache#ramCache and BucketCache#backingMap in order to
82   * determine if a given element is in the cache. The bucket cache can use on-heap or
83   * off-heap memory {@link ByteBufferIOEngine} or in a file {@link FileIOEngine} to
84   * store/read the block data.
85   *
86   * <p>Eviction is via a similar algorithm as used in
87   * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache}
88   *
89   * <p>BucketCache can be used as mainly a block cache (see
90   * {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with
91   * LruBlockCache to decrease CMS GC and heap fragmentation.
92   *
93   * <p>It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store
94   * blocks) to enlarge cache space via
95   * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache#setVictimCache}
96   */
97  @InterfaceAudience.Private
98  public class BucketCache implements BlockCache, HeapSize {
99    private static final Log LOG = LogFactory.getLog(BucketCache.class);
100 
101   /** Priority buckets */
102   private static final float DEFAULT_SINGLE_FACTOR = 0.25f;
103   private static final float DEFAULT_MULTI_FACTOR = 0.50f;
104   private static final float DEFAULT_MEMORY_FACTOR = 0.25f;
105   private static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f;
106 
107   private static final float DEFAULT_ACCEPT_FACTOR = 0.95f;
108   private static final float DEFAULT_MIN_FACTOR = 0.85f;
109 
110   /** Statistics thread */
111   private static final int statThreadPeriod = 5 * 60;
112 
113   final static int DEFAULT_WRITER_THREADS = 3;
114   final static int DEFAULT_WRITER_QUEUE_ITEMS = 64;
115 
116   // Store/read block data
117   final IOEngine ioEngine;
118 
119   // Store the block in this map before writing it to cache
120   @VisibleForTesting
121   final ConcurrentMap<BlockCacheKey, RAMQueueEntry> ramCache;
122   // In this map, store the block's meta data like offset, length
123   @VisibleForTesting
124   ConcurrentMap<BlockCacheKey, BucketEntry> backingMap;
125 
126   /**
127    * Flag if the cache is enabled or not... We shut it off if there are IO
128    * errors for some time, so that Bucket IO exceptions/errors don't bring down
129    * the HBase server.
130    */
131   private volatile boolean cacheEnabled;
132 
133   /**
134    * A list of writer queues.  We have a queue per {@link WriterThread} we have running.
135    * In other words, the work adding blocks to the BucketCache is divided up amongst the
136    * running WriterThreads.  Its done by taking hash of the cache key modulo queue count.
137    * WriterThread when it runs takes whatever has been recently added and 'drains' the entries
138    * to the BucketCache.  It then updates the ramCache and backingMap accordingly.
139    */
140   @VisibleForTesting
141   final ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues =
142       new ArrayList<BlockingQueue<RAMQueueEntry>>();
143   @VisibleForTesting
144   final WriterThread[] writerThreads;
145 
146   /** Volatile boolean to track if free space is in process or not */
147   private volatile boolean freeInProgress = false;
148   private final Lock freeSpaceLock = new ReentrantLock();
149 
150   private UniqueIndexMap<Integer> deserialiserMap = new UniqueIndexMap<Integer>();
151 
152   private final AtomicLong realCacheSize = new AtomicLong(0);
153   private final AtomicLong heapSize = new AtomicLong(0);
154   /** Current number of cached elements */
155   private final AtomicLong blockNumber = new AtomicLong(0);
156 
157   /** Cache access count (sequential ID) */
158   private final AtomicLong accessCount = new AtomicLong(0);
159 
160   private static final int DEFAULT_CACHE_WAIT_TIME = 50;
161   // Used in test now. If the flag is false and the cache speed is very fast,
162   // bucket cache will skip some blocks when caching. If the flag is true, we
163   // will wait blocks flushed to IOEngine for some time when caching
164   boolean wait_when_cache = false;
165 
166   private final BucketCacheStats cacheStats = new BucketCacheStats();
167 
168   private final String persistencePath;
169   private final long cacheCapacity;
170   /** Approximate block size */
171   private final long blockSize;
172 
173   /** Duration of IO errors tolerated before we disable cache, 1 min as default */
174   private final int ioErrorsTolerationDuration;
175   // 1 min
176   public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000;
177 
178   // Start time of first IO error when reading or writing IO Engine, it will be
179   // reset after a successful read/write.
180   private volatile long ioErrorStartTime = -1;
181 
182   /**
183    * A ReentrantReadWriteLock to lock on a particular block identified by offset.
184    * The purpose of this is to avoid freeing the block which is being read.
185    */
186   @VisibleForTesting
187   final IdReadWriteLock offsetLock = new IdReadWriteLock();
188 
189   private final NavigableSet<BlockCacheKey> blocksByHFile =
190       new ConcurrentSkipListSet<BlockCacheKey>(new Comparator<BlockCacheKey>() {
191         @Override
192         public int compare(BlockCacheKey a, BlockCacheKey b) {
193           int nameComparison = a.getHfileName().compareTo(b.getHfileName());
194           if (nameComparison != 0) {
195             return nameComparison;
196           }
197
198           if (a.getOffset() == b.getOffset()) {
199             return 0;
200           } else if (a.getOffset() < b.getOffset()) {
201             return -1;
202           }
203           return 1;
204         }
205       });
206
207   /** Statistics thread schedule pool (for heavy debugging, could remove) */
208   private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1,
209     new ThreadFactoryBuilder().setNameFormat("BucketCacheStatsExecutor").setDaemon(true).build());
210
211   // Allocate or free space for the block
212   private BucketAllocator bucketAllocator;
213
214   public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
215       int writerThreadNum, int writerQLen, String persistencePath) throws FileNotFoundException,
216       IOException {
217     this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
218       persistencePath, DEFAULT_ERROR_TOLERATION_DURATION);
219   }
220
221   public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
222       int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration)
223       throws FileNotFoundException, IOException {
224     this.ioEngine = getIOEngineFromName(ioEngineName, capacity);
225     this.writerThreads = new WriterThread[writerThreadNum];
226     long blockNumCapacity = capacity / blockSize;
227     if (blockNumCapacity >= Integer.MAX_VALUE) {
228       // Enough for about 32TB of cache!
229       throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now");
230     }
231 
232     this.cacheCapacity = capacity;
233     this.persistencePath = persistencePath;
234     this.blockSize = blockSize;
235     this.ioErrorsTolerationDuration = ioErrorsTolerationDuration;
236 
237     bucketAllocator = new BucketAllocator(capacity, bucketSizes);
238     for (int i = 0; i < writerThreads.length; ++i) {
239       writerQueues.add(new ArrayBlockingQueue<RAMQueueEntry>(writerQLen));
240     }
241 
242     assert writerQueues.size() == writerThreads.length;
243     this.ramCache = new ConcurrentHashMap<BlockCacheKey, RAMQueueEntry>();
244
245     this.backingMap = new ConcurrentHashMap<BlockCacheKey, BucketEntry>((int) blockNumCapacity);
246
247     if (ioEngine.isPersistent() && persistencePath != null) {
248       try {
249         retrieveFromFile(bucketSizes);
250       } catch (IOException ioex) {
251         LOG.error("Can't restore from file because of", ioex);
252       } catch (ClassNotFoundException cnfe) {
253         LOG.error("Can't restore from file in rebuild because can't deserialise",cnfe);
254         throw new RuntimeException(cnfe);
255       }
256     }
257     final String threadName = Thread.currentThread().getName();
258     this.cacheEnabled = true;
259     for (int i = 0; i < writerThreads.length; ++i) {
260       writerThreads[i] = new WriterThread(writerQueues.get(i));
261       writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i);
262       writerThreads[i].setDaemon(true);
263     }
264     startWriterThreads();
265
266     // Run the statistics thread periodically to print the cache statistics log
267     // TODO: Add means of turning this off.  Bit obnoxious running thread just to make a log
268     // every five minutes.
269     this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
270         statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
271     LOG.info("Started bucket cache; ioengine=" + ioEngineName +
272         ", capacity=" + StringUtils.byteDesc(capacity) +
273       ", blockSize=" + StringUtils.byteDesc(blockSize) + ", writerThreadNum=" +
274         writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath=" +
275       persistencePath + ", bucketAllocator=" + this.bucketAllocator.getClass().getName());
276   }
277
278   /**
279    * Called by the constructor to start the writer threads. Used by tests that need to override
280    * starting the threads.
281    */
282   @VisibleForTesting
283   protected void startWriterThreads() {
284     for (WriterThread thread : writerThreads) {
285       thread.start();
286     }
287   }
288 
289   @VisibleForTesting
290   boolean isCacheEnabled() {
291     return this.cacheEnabled;
292   }
293
294   public long getMaxSize() {
295     return this.cacheCapacity;
296   }
297
298   public String getIoEngine() {
299     return ioEngine.toString();
300   }
301
302   /**
303    * Get the IOEngine from the IO engine name
304    * @param ioEngineName
305    * @param capacity
306    * @return the IOEngine
307    * @throws IOException
308    */
309   private IOEngine getIOEngineFromName(String ioEngineName, long capacity)
310       throws IOException {
311     if (ioEngineName.startsWith("file:")) {
312       return new FileIOEngine(ioEngineName.substring(5), capacity);
313     } else if (ioEngineName.startsWith("offheap")) {
314       return new ByteBufferIOEngine(capacity, true);
315     } else if (ioEngineName.startsWith("heap")) {
316       return new ByteBufferIOEngine(capacity, false);
317     } else if (ioEngineName.startsWith("mmap:")) {
318       return new FileMmapEngine(ioEngineName.substring(5), capacity);
319     } else {
320       throw new IllegalArgumentException(
321           "Don't understand io engine name for cache - prefix with file:, heap or offheap");
322     }
323   }
324
325   /**
326    * Cache the block with the specified name and buffer.
327    * @param cacheKey block's cache key
328    * @param buf block buffer
329    */
330   @Override
331   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
332     cacheBlock(cacheKey, buf, false, false);
333   }
334
335   /**
336    * Cache the block with the specified name and buffer.
337    * @param cacheKey block's cache key
338    * @param cachedItem block buffer
339    * @param inMemory if block is in-memory
340    * @param cacheDataInL1
341    */
342   @Override
343   public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
344       final boolean cacheDataInL1) {
345     cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache);
346   }
347
348   /**
349    * Cache the block to ramCache
350    * @param cacheKey block's cache key
351    * @param cachedItem block buffer
352    * @param inMemory if block is in-memory
353    * @param wait if true, blocking wait when queue is full
354    */
355   public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
356       boolean wait) {
357     if (LOG.isTraceEnabled()) LOG.trace("Caching key=" + cacheKey + ", item=" + cachedItem);
358     if (!cacheEnabled) {
359       return;
360     }
361
362     if (backingMap.containsKey(cacheKey)) {
363       return;
364     }
365
366     /*
367      * Stuff the entry into the RAM cache so it can get drained to the persistent store
368      */
369     RAMQueueEntry re =
370         new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory);
371     if (ramCache.putIfAbsent(cacheKey, re) != null) {
372       return;
373     }
374     int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size();
375     BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum);
376     boolean successfulAddition = false;
377     if (wait) {
378       try {
379         successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS);
380       } catch (InterruptedException e) {
381         Thread.currentThread().interrupt();
382       }
383     } else {
384       successfulAddition = bq.offer(re);
385     }
386     if (!successfulAddition) {
387       ramCache.remove(cacheKey);
388       cacheStats.failInsert();
389     } else {
390       this.blockNumber.incrementAndGet();
391       this.heapSize.addAndGet(cachedItem.heapSize());
392       blocksByHFile.add(cacheKey);
393     }
394   }
395
396   /**
397    * Get the buffer of the block with the specified key.
398    * @param key block's cache key
399    * @param caching true if the caller caches blocks on cache misses
400    * @param repeat Whether this is a repeat lookup for the same block
401    * @param updateCacheMetrics Whether we should update cache metrics or not
402    * @return buffer of specified cache key, or null if not in cache
403    */
404   @Override
405   public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
406       boolean updateCacheMetrics) {
407     if (!cacheEnabled) {
408       return null;
409     }
410     RAMQueueEntry re = ramCache.get(key);
411     if (re != null) {
412       if (updateCacheMetrics) {
413         cacheStats.hit(caching, key.isPrimary(), key.getBlockType());
414       }
415       re.access(accessCount.incrementAndGet());
416       return re.getData();
417     }
418     BucketEntry bucketEntry = backingMap.get(key);
419     if (bucketEntry != null) {
420       long start = System.nanoTime();
421       ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset());
422       try {
423         lock.readLock().lock();
424         // We can not read here even if backingMap does contain the given key because its offset
425         // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check
426         // existence here.
427         if (bucketEntry.equals(backingMap.get(key))) {
428           // TODO : change this area - should be removed after server cells and
429           // 12295 are available
430           int len = bucketEntry.getLength();
431           if (LOG.isTraceEnabled()) {
432             LOG.trace("Read offset=" + bucketEntry.offset() + ", len=" + len);
433           }
434           Cacheable cachedBlock = ioEngine.read(bucketEntry.offset(), len,
435               bucketEntry.deserializerReference(this.deserialiserMap));
436           long timeTaken = System.nanoTime() - start;
437           if (updateCacheMetrics) {
438             cacheStats.hit(caching, key.isPrimary(), key.getBlockType());
439             cacheStats.ioHit(timeTaken);
440           }
441           if (cachedBlock.getMemoryType() == MemoryType.SHARED) {
442             bucketEntry.refCount.incrementAndGet();
443           }
444           bucketEntry.access(accessCount.incrementAndGet());
445           if (this.ioErrorStartTime > 0) {
446             ioErrorStartTime = -1;
447           }
448           return cachedBlock;
449         }
450       } catch (IOException ioex) {
451         LOG.error("Failed reading block " + key + " from bucket cache", ioex);
452         checkIOErrorIsTolerated();
453       } finally {
454         lock.readLock().unlock();
455       }
456     }
457     if (!repeat && updateCacheMetrics) {
458       cacheStats.miss(caching, key.isPrimary(), key.getBlockType());
459     }
460     return null;
461   }
462
463   @VisibleForTesting
464   void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber) {
465     bucketAllocator.freeBlock(bucketEntry.offset());
466     realCacheSize.addAndGet(-1 * bucketEntry.getLength());
467     blocksByHFile.remove(cacheKey);
468     if (decrementBlockNumber) {
469       this.blockNumber.decrementAndGet();
470     }
471   }
472 
473   @Override
474   public boolean evictBlock(BlockCacheKey cacheKey) {
475     return evictBlock(cacheKey, true);
476   }
477
478   // does not check for the ref count. Just tries to evict it if found in the
479   // bucket map
480   private boolean forceEvict(BlockCacheKey cacheKey) {
481     if (!cacheEnabled) {
482       return false;
483     }
484     RAMQueueEntry removedBlock = checkRamCache(cacheKey);
485     BucketEntry bucketEntry = backingMap.get(cacheKey);
486     if (bucketEntry == null) {
487       if (removedBlock != null) {
488         cacheStats.evicted(0, cacheKey.isPrimary());
489         return true;
490       } else {
491         return false;
492       }
493     }
494     ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset());
495     try {
496       lock.writeLock().lock();
497       if (backingMap.remove(cacheKey, bucketEntry)) {
498         blockEvicted(cacheKey, bucketEntry, removedBlock == null);
499       } else {
500         return false;
501       }
502     } finally {
503       lock.writeLock().unlock();
504     }
505     cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary());
506     return true;
507   }
508
509   private RAMQueueEntry checkRamCache(BlockCacheKey cacheKey) {
510     RAMQueueEntry removedBlock = ramCache.remove(cacheKey);
511     if (removedBlock != null) {
512       this.blockNumber.decrementAndGet();
513       this.heapSize.addAndGet(-1 * removedBlock.getData().heapSize());
514     }
515     return removedBlock;
516   }
517
518   public boolean evictBlock(BlockCacheKey cacheKey, boolean deletedBlock) {
519     if (!cacheEnabled) {
520       return false;
521     }
522     RAMQueueEntry removedBlock = checkRamCache(cacheKey);
523     BucketEntry bucketEntry = backingMap.get(cacheKey);
524     if (bucketEntry == null) {
525       if (removedBlock != null) {
526         cacheStats.evicted(0, cacheKey.isPrimary());
527         return true;
528       } else {
529         return false;
530       }
531     }
532     ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset());
533     try {
534       lock.writeLock().lock();
535       int refCount = bucketEntry.refCount.get();
536       if(refCount == 0) {
537         if (backingMap.remove(cacheKey, bucketEntry)) {
538           blockEvicted(cacheKey, bucketEntry, removedBlock == null);
539         } else {
540           return false;
541         }
542       } else {
543         if(!deletedBlock) {
544           if (LOG.isDebugEnabled()) {
545             LOG.debug("This block " + cacheKey + " is still referred by " + refCount
546                 + " readers. Can not be freed now");
547           }
548           return false;
549         } else {
550           if (LOG.isDebugEnabled()) {
551             LOG.debug("This block " + cacheKey + " is still referred by " + refCount
552                 + " readers. Can not be freed now. Hence will mark this"
553                 + " for evicting at a later point");
554           }
555           bucketEntry.markedForEvict = true;
556         }
557       }
558     } finally {
559       lock.writeLock().unlock();
560     }
561     cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary());
562     return true;
563   }
564
565   /*
566    * Statistics thread.  Periodically output cache statistics to the log.
567    */
568   private static class StatisticsThread extends Thread {
569     private final BucketCache bucketCache;
570
571     public StatisticsThread(BucketCache bucketCache) {
572       super("BucketCacheStatsThread");
573       setDaemon(true);
574       this.bucketCache = bucketCache;
575     }
576
577     @Override
578     public void run() {
579       bucketCache.logStats();
580     }
581   }
582
583   public void logStats() {
584     long totalSize = bucketAllocator.getTotalSize();
585     long usedSize = bucketAllocator.getUsedSize();
586     long freeSize = totalSize - usedSize;
587     long cacheSize = getRealCacheSize();
588     LOG.info("failedBlockAdditions=" + cacheStats.getFailedInserts() + ", " +
589         "totalSize=" + StringUtils.byteDesc(totalSize) + ", " +
590         "freeSize=" + StringUtils.byteDesc(freeSize) + ", " +
591         "usedSize=" + StringUtils.byteDesc(usedSize) +", " +
592         "cacheSize=" + StringUtils.byteDesc(cacheSize) +", " +
593         "accesses=" + cacheStats.getRequestCount() + ", " +
594         "hits=" + cacheStats.getHitCount() + ", " +
595         "IOhitsPerSecond=" + cacheStats.getIOHitsPerSecond() + ", " +
596         "IOTimePerHit=" + String.format("%.2f", cacheStats.getIOTimePerHit())+ ", " +
597         "hitRatio=" + (cacheStats.getHitCount() == 0 ? "0," :
598           (StringUtils.formatPercent(cacheStats.getHitRatio(), 2)+ ", ")) +
599         "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " +
600         "cachingHits=" + cacheStats.getHitCachingCount() + ", " +
601         "cachingHitsRatio=" +(cacheStats.getHitCachingCount() == 0 ? "0," :
602           (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2)+ ", ")) +
603         "evictions=" + cacheStats.getEvictionCount() + ", " +
604         "evicted=" + cacheStats.getEvictedCount() + ", " +
605         "evictedPerRun=" + cacheStats.evictedPerEviction());
606     cacheStats.reset();
607   }
608
609   public long getRealCacheSize() {
610     return this.realCacheSize.get();
611   }
612
613   private long acceptableSize() {
614     return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_ACCEPT_FACTOR);
615   }
616 
617   private long singleSize() {
618     return (long) Math.floor(bucketAllocator.getTotalSize()
619         * DEFAULT_SINGLE_FACTOR * DEFAULT_MIN_FACTOR);
620   }
621 
622   private long multiSize() {
623     return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_MULTI_FACTOR
624         * DEFAULT_MIN_FACTOR);
625   }
626 
627   private long memorySize() {
628     return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_MEMORY_FACTOR
629         * DEFAULT_MIN_FACTOR);
630   }
631
632   /**
633    * Free the space if the used size reaches acceptableSize() or one size block
634    * couldn't be allocated. When freeing the space, we use the LRU algorithm and
635    * ensure there must be some blocks evicted
636    * @param why Why we are being called
637    */
638   private void freeSpace(final String why) {
639     // Ensure only one freeSpace progress at a time
640     if (!freeSpaceLock.tryLock()) {
641       return;
642     }
643     try {
644       freeInProgress = true;
645       long bytesToFreeWithoutExtra = 0;
646       // Calculate free byte for each bucketSizeinfo
647       StringBuffer msgBuffer = LOG.isDebugEnabled()? new StringBuffer(): null;
648       BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics();
649       long[] bytesToFreeForBucket = new long[stats.length];
650       for (int i = 0; i < stats.length; i++) {
651         bytesToFreeForBucket[i] = 0;
652         long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - DEFAULT_MIN_FACTOR));
653         freeGoal = Math.max(freeGoal, 1);
654         if (stats[i].freeCount() < freeGoal) {
655           bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount());
656           bytesToFreeWithoutExtra += bytesToFreeForBucket[i];
657           if (msgBuffer != null) {
658             msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")="
659               + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", ");
660           }
661         }
662       }
663       if (msgBuffer != null) {
664         msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", ");
665       }
666
667       if (bytesToFreeWithoutExtra <= 0) {
668         return;
669       }
670       long currentSize = bucketAllocator.getUsedSize();
671       long totalSize = bucketAllocator.getTotalSize();
672       if (LOG.isDebugEnabled() && msgBuffer != null) {
673         LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString() +
674           " of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" +
675           StringUtils.byteDesc(realCacheSize.get()) + ", total=" + StringUtils.byteDesc(totalSize));
676       }
677
678       long bytesToFreeWithExtra = (long) Math.floor(bytesToFreeWithoutExtra
679           * (1 + DEFAULT_EXTRA_FREE_FACTOR));
680
681       // Instantiate priority buckets
682       BucketEntryGroup bucketSingle = new BucketEntryGroup(bytesToFreeWithExtra,
683           blockSize, singleSize());
684       BucketEntryGroup bucketMulti = new BucketEntryGroup(bytesToFreeWithExtra,
685           blockSize, multiSize());
686       BucketEntryGroup bucketMemory = new BucketEntryGroup(bytesToFreeWithExtra,
687           blockSize, memorySize());
688
689       // Scan entire map putting bucket entry into appropriate bucket entry
690       // group
691       for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) {
692         switch (bucketEntryWithKey.getValue().getPriority()) {
693           case SINGLE: {
694             bucketSingle.add(bucketEntryWithKey);
695             break;
696           }
697           case MULTI: {
698             bucketMulti.add(bucketEntryWithKey);
699             break;
700           }
701           case MEMORY: {
702             bucketMemory.add(bucketEntryWithKey);
703             break;
704           }
705         }
706       }
707
708       PriorityQueue<BucketEntryGroup> bucketQueue = new PriorityQueue<BucketEntryGroup>(3);
709
710       bucketQueue.add(bucketSingle);
711       bucketQueue.add(bucketMulti);
712       bucketQueue.add(bucketMemory);
713
714       int remainingBuckets = 3;
715       long bytesFreed = 0;
716
717       BucketEntryGroup bucketGroup;
718       while ((bucketGroup = bucketQueue.poll()) != null) {
719         long overflow = bucketGroup.overflow();
720         if (overflow > 0) {
721           long bucketBytesToFree = Math.min(overflow,
722               (bytesToFreeWithoutExtra - bytesFreed) / remainingBuckets);
723           bytesFreed += bucketGroup.free(bucketBytesToFree);
724         }
725         remainingBuckets--;
726       }
727
728       /**
729        * Check whether need extra free because some bucketSizeinfo still needs
730        * free space
731        */
732       stats = bucketAllocator.getIndexStatistics();
733       boolean needFreeForExtra = false;
734       for (int i = 0; i < stats.length; i++) {
735         long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - DEFAULT_MIN_FACTOR));
736         freeGoal = Math.max(freeGoal, 1);
737         if (stats[i].freeCount() < freeGoal) {
738           needFreeForExtra = true;
739           break;
740         }
741       }
742
743       if (needFreeForExtra) {
744         bucketQueue.clear();
745         remainingBuckets = 2;
746
747         bucketQueue.add(bucketSingle);
748         bucketQueue.add(bucketMulti);
749
750         while ((bucketGroup = bucketQueue.poll()) != null) {
751           long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets;
752           bytesFreed += bucketGroup.free(bucketBytesToFree);
753           remainingBuckets--;
754         }
755       }
756
757       if (LOG.isDebugEnabled()) {
758         long single = bucketSingle.totalSize();
759         long multi = bucketMulti.totalSize();
760         long memory = bucketMemory.totalSize();
761         if (LOG.isDebugEnabled()) {
762           LOG.debug("Bucket cache free space completed; " + "freed="
763             + StringUtils.byteDesc(bytesFreed) + ", " + "total="
764             + StringUtils.byteDesc(totalSize) + ", " + "single="
765             + StringUtils.byteDesc(single) + ", " + "multi="
766             + StringUtils.byteDesc(multi) + ", " + "memory="
767             + StringUtils.byteDesc(memory));
768         }
769       }
770
771     } catch (Throwable t) {
772       LOG.warn("Failed freeing space", t);
773     } finally {
774       cacheStats.evict();
775       freeInProgress = false;
776       freeSpaceLock.unlock();
777     }
778   }
779
780   // This handles flushing the RAM cache to IOEngine.
781   @VisibleForTesting
782   class WriterThread extends HasThread {
783     private final BlockingQueue<RAMQueueEntry> inputQueue;
784     private volatile boolean writerEnabled = true;
785
786     WriterThread(BlockingQueue<RAMQueueEntry> queue) {
787       this.inputQueue = queue;
788     }
789
790     // Used for test
791     @VisibleForTesting
792     void disableWriter() {
793       this.writerEnabled = false;
794     }
795
796     public void run() {
797       List<RAMQueueEntry> entries = new ArrayList<RAMQueueEntry>();
798       try {
799         while (cacheEnabled && writerEnabled) {
800           try {
801             try {
802               // Blocks
803               entries = getRAMQueueEntries(inputQueue, entries);
804             } catch (InterruptedException ie) {
805               if (!cacheEnabled) break;
806             }
807             doDrain(entries);
808           } catch (Exception ioe) {
809             LOG.error("WriterThread encountered error", ioe);
810           }
811         }
812       } catch (Throwable t) {
813         LOG.warn("Failed doing drain", t);
814       }
815       LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled);
816     }
817
818     /**
819      * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap.
820      * Process all that are passed in even if failure being sure to remove from ramCache else we'll
821      * never undo the references and we'll OOME.
822      * @param entries Presumes list passed in here will be processed by this invocation only. No
823      *   interference expected.
824      * @throws InterruptedException
825      */
826     @VisibleForTesting
827     void doDrain(final List<RAMQueueEntry> entries) throws InterruptedException {
828       if (entries.isEmpty()) {
829         return;
830       }
831       // This method is a little hard to follow. We run through the passed in entries and for each
832       // successful add, we add a non-null BucketEntry to the below bucketEntries.  Later we must
833       // do cleanup making sure we've cleared ramCache of all entries regardless of whether we
834       // successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by
835       // filling ramCache.  We do the clean up by again running through the passed in entries
836       // doing extra work when we find a non-null bucketEntries corresponding entry.
837       final int size = entries.size();
838       BucketEntry[] bucketEntries = new BucketEntry[size];
839       // Index updated inside loop if success or if we can't succeed. We retry if cache is full
840       // when we go to add an entry by going around the loop again without upping the index.
841       int index = 0;
842       while (cacheEnabled && index < size) {
843         RAMQueueEntry re = null;
844         try {
845           re = entries.get(index);
846           if (re == null) {
847             LOG.warn("Couldn't get entry or changed on us; who else is messing with it?");
848             index++;
849             continue;
850           }
851           BucketEntry bucketEntry =
852             re.writeToCache(ioEngine, bucketAllocator, deserialiserMap, realCacheSize);
853           // Successfully added.  Up index and add bucketEntry. Clear io exceptions.
854           bucketEntries[index] = bucketEntry;
855           if (ioErrorStartTime > 0) {
856             ioErrorStartTime = -1;
857           }
858           index++;
859         } catch (BucketAllocatorException fle) {
860           LOG.warn("Failed allocation for " + (re == null ? "" : re.getKey()) + "; " + fle);
861           // Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below.
862           bucketEntries[index] = null;
863           index++;
864         } catch (CacheFullException cfe) {
865           // Cache full when we tried to add. Try freeing space and then retrying (don't up index)
866           if (!freeInProgress) {
867             freeSpace("Full!");
868           } else {
869             Thread.sleep(50);
870           }
871         } catch (IOException ioex) {
872           // Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem.
873           LOG.error("Failed writing to bucket cache", ioex);
874           checkIOErrorIsTolerated();
875         }
876       }
877
878       // Make sure data pages are written on media before we update maps.
879       try {
880         ioEngine.sync();
881       } catch (IOException ioex) {
882         LOG.error("Failed syncing IO engine", ioex);
883         checkIOErrorIsTolerated();
884         // Since we failed sync, free the blocks in bucket allocator
885         for (int i = 0; i < entries.size(); ++i) {
886           if (bucketEntries[i] != null) {
887             bucketAllocator.freeBlock(bucketEntries[i].offset());
888             bucketEntries[i] = null;
889           }
890         }
891       }
892
893       // Now add to backingMap if successfully added to bucket cache.  Remove from ramCache if
894       // success or error.
895       for (int i = 0; i < size; ++i) {
896         BlockCacheKey key = entries.get(i).getKey();
897         // Only add if non-null entry.
898         if (bucketEntries[i] != null) {
899           backingMap.put(key, bucketEntries[i]);
900         }
901         // Always remove from ramCache even if we failed adding it to the block cache above.
902         RAMQueueEntry ramCacheEntry = ramCache.remove(key);
903         if (ramCacheEntry != null) {
904           heapSize.addAndGet(-1 * entries.get(i).getData().heapSize());
905         } else if (bucketEntries[i] != null){
906           // Block should have already been evicted. Remove it and free space.
907           ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntries[i].offset());
908           try {
909             lock.writeLock().lock();
910             if (backingMap.remove(key, bucketEntries[i])) {
911               blockEvicted(key, bucketEntries[i], false);
912             }
913           } finally {
914             lock.writeLock().unlock();
915           }
916         }
917       }
918
919       long used = bucketAllocator.getUsedSize();
920       if (used > acceptableSize()) {
921         freeSpace("Used=" + used + " > acceptable=" + acceptableSize());
922       }
923       return;
924     }
925   }
926
927   /**
928    * Blocks until elements available in {@code q} then tries to grab as many as possible
929    * before returning.
930    * @param receptacle Where to stash the elements taken from queue. We clear before we use it
931    *     just in case.
932    * @param q The queue to take from.
933    * @return {@code receptacle} laden with elements taken from the queue or empty if none found.
934    */
935   @VisibleForTesting
936   static List<RAMQueueEntry> getRAMQueueEntries(final BlockingQueue<RAMQueueEntry> q,
937       final List<RAMQueueEntry> receptacle)
938   throws InterruptedException {
939     // Clear sets all entries to null and sets size to 0. We retain allocations. Presume it
940     // ok even if list grew to accommodate thousands.
941     receptacle.clear();
942     receptacle.add(q.take());
943     q.drainTo(receptacle);
944     return receptacle;
945   }
946
947   private void persistToFile() throws IOException {
948     assert !cacheEnabled;
949     FileOutputStream fos = null;
950     ObjectOutputStream oos = null;
951     try {
952       if (!ioEngine.isPersistent()) {
953         throw new IOException("Attempt to persist non-persistent cache mappings!");
954       }
955       fos = new FileOutputStream(persistencePath, false);
956       oos = new ObjectOutputStream(fos);
957       oos.writeLong(cacheCapacity);
958       oos.writeUTF(ioEngine.getClass().getName());
959       oos.writeUTF(backingMap.getClass().getName());
960       oos.writeObject(deserialiserMap);
961       oos.writeObject(backingMap);
962     } finally {
963       if (oos != null) oos.close();
964       if (fos != null) fos.close();
965     }
966   }
967
968   @SuppressWarnings("unchecked")
969   private void retrieveFromFile(int[] bucketSizes) throws IOException, BucketAllocatorException,
970       ClassNotFoundException {
971     File persistenceFile = new File(persistencePath);
972     if (!persistenceFile.exists()) {
973       return;
974     }
975     assert !cacheEnabled;
976     FileInputStream fis = null;
977     ObjectInputStream ois = null;
978     try {
979       if (!ioEngine.isPersistent())
980         throw new IOException(
981             "Attempt to restore non-persistent cache mappings!");
982       fis = new FileInputStream(persistencePath);
983       ois = new ObjectInputStream(fis);
984       long capacitySize = ois.readLong();
985       if (capacitySize != cacheCapacity)
986         throw new IOException("Mismatched cache capacity:"
987             + StringUtils.byteDesc(capacitySize) + ", expected: "
988             + StringUtils.byteDesc(cacheCapacity));
989       String ioclass = ois.readUTF();
990       String mapclass = ois.readUTF();
991       if (!ioEngine.getClass().getName().equals(ioclass))
992         throw new IOException("Class name for IO engine mismatch: " + ioclass
993             + ", expected:" + ioEngine.getClass().getName());
994       if (!backingMap.getClass().getName().equals(mapclass))
995         throw new IOException("Class name for cache map mismatch: " + mapclass
996             + ", expected:" + backingMap.getClass().getName());
997       UniqueIndexMap<Integer> deserMap = (UniqueIndexMap<Integer>) ois
998           .readObject();
999       BucketAllocator allocator = new BucketAllocator(cacheCapacity, bucketSizes,
1000           backingMap, realCacheSize);
1001       backingMap = (ConcurrentHashMap<BlockCacheKey, BucketEntry>) ois
1002           .readObject();
1003       bucketAllocator = allocator;
1004       deserialiserMap = deserMap;
1005     } finally {
1006       if (ois != null) ois.close();
1007       if (fis != null) fis.close();
1008       if (!persistenceFile.delete()) {
1009         throw new IOException("Failed deleting persistence file "
1010             + persistenceFile.getAbsolutePath());
1011       }
1012     }
1013   }
1014
1015   /**
1016    * Check whether we tolerate IO error this time. If the duration of IOEngine
1017    * throwing errors exceeds ioErrorsDurationTimeTolerated, we will disable the
1018    * cache
1019    */
1020   private void checkIOErrorIsTolerated() {
1021     long now = EnvironmentEdgeManager.currentTime();
1022     if (this.ioErrorStartTime > 0) {
1023       if (cacheEnabled && (now - ioErrorStartTime) > this.ioErrorsTolerationDuration) {
1024         LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration +
1025           "ms, disabing cache, please check your IOEngine");
1026         disableCache();
1027       }
1028     } else {
1029       this.ioErrorStartTime = now;
1030     }
1031   }
1032
1033   /**
1034    * Used to shut down the cache -or- turn it off in the case of something broken.
1035    */
1036   private void disableCache() {
1037     if (!cacheEnabled) return;
1038     cacheEnabled = false;
1039     ioEngine.shutdown();
1040     this.scheduleThreadPool.shutdown();
1041     for (int i = 0; i < writerThreads.length; ++i) writerThreads[i].interrupt();
1042     this.ramCache.clear();
1043     if (!ioEngine.isPersistent() || persistencePath == null) {
1044       // If persistent ioengine and a path, we will serialize out the backingMap.
1045       this.backingMap.clear();
1046     }
1047   }
1048 
1049   private void join() throws InterruptedException {
1050     for (int i = 0; i < writerThreads.length; ++i)
1051       writerThreads[i].join();
1052   }
1053
1054   @Override
1055   public void shutdown() {
1056     disableCache();
1057     LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent()
1058         + "; path to write=" + persistencePath);
1059     if (ioEngine.isPersistent() && persistencePath != null) {
1060       try {
1061         join();
1062         persistToFile();
1063       } catch (IOException ex) {
1064         LOG.error("Unable to persist data on exit: " + ex.toString(), ex);
1065       } catch (InterruptedException e) {
1066         LOG.warn("Failed to persist data on exit", e);
1067       }
1068     }
1069   }
1070 
1071   @Override
1072   public CacheStats getStats() {
1073     return cacheStats;
1074   }
1075
1076   public BucketAllocator getAllocator() {
1077     return this.bucketAllocator;
1078   }
1079 
1080   @Override
1081   public long heapSize() {
1082     return this.heapSize.get();
1083   }
1084 
1085   @Override
1086   public long size() {
1087     return this.realCacheSize.get();
1088   }
1089 
1090   @Override
1091   public long getFreeSize() {
1092     return this.bucketAllocator.getFreeSize();
1093   }
1094 
1095   @Override
1096   public long getBlockCount() {
1097     return this.blockNumber.get();
1098   }
1099 
1100   @Override
1101   public long getCurrentSize() {
1102     return this.bucketAllocator.getUsedSize();
1103   }
1104
1105   /**
1106    * Evicts all blocks for a specific HFile.
1107    * <p>
1108    * This is used for evict-on-close to remove all blocks of a specific HFile.
1109    *
1110    * @return the number of blocks evicted
1111    */
1112   @Override
1113   public int evictBlocksByHfileName(String hfileName) {
1114     Set<BlockCacheKey> keySet = blocksByHFile.subSet(
1115         new BlockCacheKey(hfileName, Long.MIN_VALUE), true,
1116         new BlockCacheKey(hfileName, Long.MAX_VALUE), true);
1117
1118     int numEvicted = 0;
1119     for (BlockCacheKey key : keySet) {
1120       if (evictBlock(key)) {
1121           ++numEvicted;
1122       }
1123     }
1124
1125     return numEvicted;
1126   }
1127
1128   /**
1129    * Item in cache. We expect this to be where most memory goes. Java uses 8
1130    * bytes just for object headers; after this, we want to use as little as
1131    * possible - so we only use 8 bytes, but in order to do so we end up messing
1132    * around with all this Java casting stuff. Offset stored as 5 bytes that make
1133    * up the long. Doubt we'll see devices this big for ages. Offsets are divided
1134    * by 256. So 5 bytes gives us 256TB or so.
1135    */
1136   static class BucketEntry implements Serializable {
1137     private static final long serialVersionUID = -6741504807982257534L;
1138
1139     // access counter comparator, descending order
1140     static final Comparator<BucketEntry> COMPARATOR = new Comparator<BucketCache.BucketEntry>() {
1141
1142       @Override
1143       public int compare(BucketEntry o1, BucketEntry o2) {
1144         return Long.compare(o2.accessCounter, o1.accessCounter);
1145       }
1146     };
1147
1148     private int offsetBase;
1149     private int length;
1150     private byte offset1;
1151     byte deserialiserIndex;
1152     private volatile long accessCounter;
1153     private BlockPriority priority;
1154     // Set this when we were not able to forcefully evict the block
1155     private volatile boolean markedForEvict;
1156     private AtomicInteger refCount = new AtomicInteger(0);
1157
1158     /**
1159      * Time this block was cached.  Presumes we are created just before we are added to the cache.
1160      */
1161     private final long cachedTime = System.nanoTime();
1162
1163     BucketEntry(long offset, int length, long accessCounter, boolean inMemory) {
1164       setOffset(offset);
1165       this.length = length;
1166       this.accessCounter = accessCounter;
1167       if (inMemory) {
1168         this.priority = BlockPriority.MEMORY;
1169       } else {
1170         this.priority = BlockPriority.SINGLE;
1171       }
1172     }
1173
1174     long offset() { // Java has no unsigned numbers
1175       long o = ((long) offsetBase) & 0xFFFFFFFF;
1176       o += (((long) (offset1)) & 0xFF) << 32;
1177       return o << 8;
1178     }
1179
1180     private void setOffset(long value) {
1181       assert (value & 0xFF) == 0;
1182       value >>= 8;
1183       offsetBase = (int) value;
1184       offset1 = (byte) (value >> 32);
1185     }
1186
1187     public int getLength() {
1188       return length;
1189     }
1190
1191     protected CacheableDeserializer<Cacheable> deserializerReference(
1192         UniqueIndexMap<Integer> deserialiserMap) {
1193       return CacheableDeserializerIdManager.getDeserializer(deserialiserMap
1194           .unmap(deserialiserIndex));
1195     }
1196
1197     protected void setDeserialiserReference(
1198         CacheableDeserializer<Cacheable> deserializer,
1199         UniqueIndexMap<Integer> deserialiserMap) {
1200       this.deserialiserIndex = ((byte) deserialiserMap.map(deserializer
1201           .getDeserialiserIdentifier()));
1202     }
1203
1204     /**
1205      * Block has been accessed. Update its local access counter.
1206      */
1207     public void access(long accessCounter) {
1208       this.accessCounter = accessCounter;
1209       if (this.priority == BlockPriority.SINGLE) {
1210         this.priority = BlockPriority.MULTI;
1211       }
1212     }
1213
1214     public BlockPriority getPriority() {
1215       return this.priority;
1216     }
1217
1218     public long getCachedTime() {
1219       return cachedTime;
1220     }
1221   }
1222
1223   /**
1224    * Used to group bucket entries into priority buckets. There will be a
1225    * BucketEntryGroup for each priority (single, multi, memory). Once bucketed,
1226    * the eviction algorithm takes the appropriate number of elements out of each
1227    * according to configuration parameters and their relative sizes.
1228    */
1229   private class BucketEntryGroup implements Comparable<BucketEntryGroup> {
1230
1231     private CachedEntryQueue queue;
1232     private long totalSize = 0;
1233     private long bucketSize;
1234
1235     public BucketEntryGroup(long bytesToFree, long blockSize, long bucketSize) {
1236       this.bucketSize = bucketSize;
1237       queue = new CachedEntryQueue(bytesToFree, blockSize);
1238       totalSize = 0;
1239     }
1240
1241     public void add(Map.Entry<BlockCacheKey, BucketEntry> block) {
1242       totalSize += block.getValue().getLength();
1243       queue.add(block);
1244     }
1245
1246     public long free(long toFree) {
1247       Map.Entry<BlockCacheKey, BucketEntry> entry;
1248       long freedBytes = 0;
1249       // TODO avoid a cycling siutation. We find no block which is not in use and so no way to free
1250       // What to do then? Caching attempt fail? Need some changes in cacheBlock API?
1251       while ((entry = queue.pollLast()) != null) {
1252         if (evictBlock(entry.getKey(), false)) {
1253           freedBytes += entry.getValue().getLength();
1254         }
1255         if (freedBytes >= toFree) {
1256           return freedBytes;
1257         }
1258       }
1259       return freedBytes;
1260     }
1261
1262     public long overflow() {
1263       return totalSize - bucketSize;
1264     }
1265
1266     public long totalSize() {
1267       return totalSize;
1268     }
1269
1270     @Override
1271     public int compareTo(BucketEntryGroup that) {
1272       return Long.compare(this.overflow(), that.overflow());
1273     }
1274
1275     @Override
1276     public boolean equals(Object that) {
1277       return this == that;
1278     }
1279
1280   }
1281
1282   /**
1283    * Block Entry stored in the memory with key,data and so on
1284    */
1285   @VisibleForTesting
1286   static class RAMQueueEntry {
1287     private BlockCacheKey key;
1288     private Cacheable data;
1289     private long accessCounter;
1290     private boolean inMemory;
1291
1292     public RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter,
1293         boolean inMemory) {
1294       this.key = bck;
1295       this.data = data;
1296       this.accessCounter = accessCounter;
1297       this.inMemory = inMemory;
1298     }
1299
1300     public Cacheable getData() {
1301       return data;
1302     }
1303
1304     public BlockCacheKey getKey() {
1305       return key;
1306     }
1307
1308     public void access(long accessCounter) {
1309       this.accessCounter = accessCounter;
1310     }
1311
1312     public BucketEntry writeToCache(final IOEngine ioEngine,
1313         final BucketAllocator bucketAllocator,
1314         final UniqueIndexMap<Integer> deserialiserMap,
1315         final AtomicLong realCacheSize) throws CacheFullException, IOException,
1316         BucketAllocatorException {
1317       int len = data.getSerializedLength();
1318       // This cacheable thing can't be serialized
1319       if (len == 0) return null;
1320       long offset = bucketAllocator.allocateBlock(len);
1321       BucketEntry bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory);
1322       bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap);
1323       try {
1324         if (data instanceof HFileBlock) {
1325           // If an instance of HFileBlock, save on some allocations.
1326           HFileBlock block = (HFileBlock)data;
1327           ByteBuff sliceBuf = block.getBufferReadOnly();
1328           ByteBuffer metadata = block.getMetaData();
1329           if (LOG.isTraceEnabled()) {
1330             LOG.trace("Write offset=" + offset + ", len=" + len);
1331           }
1332           ioEngine.write(sliceBuf, offset);
1333           ioEngine.write(metadata, offset + len - metadata.limit());
1334         } else {
1335           ByteBuffer bb = ByteBuffer.allocate(len);
1336           data.serialize(bb);
1337           ioEngine.write(bb, offset);
1338         }
1339       } catch (IOException ioe) {
1340         // free it in bucket allocator
1341         bucketAllocator.freeBlock(offset);
1342         throw ioe;
1343       }
1344
1345       realCacheSize.addAndGet(len);
1346       return bucketEntry;
1347     }
1348   }
1349
1350   /**
1351    * Only used in test
1352    * @throws InterruptedException
1353    */
1354   void stopWriterThreads() throws InterruptedException {
1355     for (WriterThread writerThread : writerThreads) {
1356       writerThread.disableWriter();
1357       writerThread.interrupt();
1358       writerThread.join();
1359     }
1360   }
1361
1362   @Override
1363   public Iterator<CachedBlock> iterator() {
1364     // Don't bother with ramcache since stuff is in here only a little while.
1365     final Iterator<Map.Entry<BlockCacheKey, BucketEntry>> i =
1366         this.backingMap.entrySet().iterator();
1367     return new Iterator<CachedBlock>() {
1368       private final long now = System.nanoTime();
1369
1370       @Override
1371       public boolean hasNext() {
1372         return i.hasNext();
1373       }
1374
1375       @Override
1376       public CachedBlock next() {
1377         final Map.Entry<BlockCacheKey, BucketEntry> e = i.next();
1378         return new CachedBlock() {
1379           @Override
1380           public String toString() {
1381             return BlockCacheUtil.toString(this, now);
1382           }
1383
1384           @Override
1385           public BlockPriority getBlockPriority() {
1386             return e.getValue().getPriority();
1387           }
1388
1389           @Override
1390           public BlockType getBlockType() {
1391             // Not held by BucketEntry.  Could add it if wanted on BucketEntry creation.
1392             return null;
1393           }
1394
1395           @Override
1396           public long getOffset() {
1397             return e.getKey().getOffset();
1398           }
1399
1400           @Override
1401           public long getSize() {
1402             return e.getValue().getLength();
1403           }
1404
1405           @Override
1406           public long getCachedTime() {
1407             return e.getValue().getCachedTime();
1408           }
1409
1410           @Override
1411           public String getFilename() {
1412             return e.getKey().getHfileName();
1413           }
1414
1415           @Override
1416           public int compareTo(CachedBlock other) {
1417             int diff = this.getFilename().compareTo(other.getFilename());
1418             if (diff != 0) return diff;
1419
1420             diff = Long.compare(this.getOffset(), other.getOffset());
1421             if (diff != 0) return diff;
1422             if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
1423               throw new IllegalStateException("" + this.getCachedTime() + ", " +
1424                 other.getCachedTime());
1425             }
1426             return Long.compare(other.getCachedTime(), this.getCachedTime());
1427           }
1428
1429           @Override
1430           public int hashCode() {
1431             return e.getKey().hashCode();
1432           }
1433
1434           @Override
1435           public boolean equals(Object obj) {
1436             if (obj instanceof CachedBlock) {
1437               CachedBlock cb = (CachedBlock)obj;
1438               return compareTo(cb) == 0;
1439             } else {
1440               return false;
1441             }
1442           }
1443         };
1444       }
1445
1446       @Override
1447       public void remove() {
1448         throw new UnsupportedOperationException();
1449       }
1450     };
1451   }
1452
1453   @Override
1454   public BlockCache[] getBlockCaches() {
1455     return null;
1456   }
1457
1458   @Override
1459   public void returnBlock(BlockCacheKey cacheKey, Cacheable block) {
1460     if (block.getMemoryType() == MemoryType.SHARED) {
1461       BucketEntry bucketEntry = backingMap.get(cacheKey);
1462       if (bucketEntry != null) {
1463         int refCount = bucketEntry.refCount.decrementAndGet();
1464         if (bucketEntry.markedForEvict && refCount == 0) {
1465           forceEvict(cacheKey);
1466         }
1467       }
1468     }
1469   }
1470
1471   @VisibleForTesting
1472   public int getRefCount(BlockCacheKey cacheKey) {
1473     BucketEntry bucketEntry = backingMap.get(cacheKey);
1474     if (bucketEntry != null) {
1475       return bucketEntry.refCount.get();
1476     }
1477     return 0;
1478   }
1479 }