View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   
16   * distributed under the License is distributed on an "AS IS" BASIS,
17   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18   * See the License for the specific language governing permissions and
19   * limitations under the License.
20   */
21  package org.apache.hadoop.hbase.io.hfile.bucket;
22  
23  import java.io.File;
24  import java.io.FileInputStream;
25  import java.io.FileNotFoundException;
26  import java.io.FileOutputStream;
27  import java.io.IOException;
28  import java.io.ObjectInputStream;
29  import java.io.ObjectOutputStream;
30  import java.io.Serializable;
31  import java.nio.ByteBuffer;
32  import java.util.ArrayList;
33  import java.util.Comparator;
34  import java.util.List;
35  import java.util.Map;
36  import java.util.PriorityQueue;
37  import java.util.Set;
38  import java.util.concurrent.ArrayBlockingQueue;
39  import java.util.concurrent.BlockingQueue;
40  import java.util.concurrent.ConcurrentHashMap;
41  import java.util.concurrent.Executors;
42  import java.util.concurrent.ScheduledExecutorService;
43  import java.util.concurrent.TimeUnit;
44  import java.util.concurrent.atomic.AtomicLong;
45  import java.util.concurrent.locks.Lock;
46  import java.util.concurrent.locks.ReentrantLock;
47  
48  import org.apache.commons.logging.Log;
49  import org.apache.commons.logging.LogFactory;
50  import org.apache.hadoop.classification.InterfaceAudience;
51  import org.apache.hadoop.conf.Configuration;
52  import org.apache.hadoop.hbase.io.HeapSize;
53  import org.apache.hadoop.hbase.io.hfile.BlockCache;
54  import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary;
55  import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
56  import org.apache.hadoop.hbase.io.hfile.CacheStats;
57  import org.apache.hadoop.hbase.io.hfile.Cacheable;
58  import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
59  import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
60  import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
61  import org.apache.hadoop.hbase.io.hfile.HFileBlock;
62  import org.apache.hadoop.hbase.regionserver.StoreFile;
63  import org.apache.hadoop.hbase.util.ConcurrentIndex;
64  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
65  import org.apache.hadoop.hbase.util.HasThread;
66  import org.apache.hadoop.hbase.util.IdLock;
67  import org.apache.hadoop.util.StringUtils;
68  
69  import com.google.common.collect.ImmutableList;
70  import com.google.common.util.concurrent.ThreadFactoryBuilder;
71  
72  /**
73   * BucketCache uses {@link BucketAllocator} to allocate/free block, and use
74   * {@link BucketCache#ramCache} and {@link BucketCache#backingMap} in order to
75   * determine whether a given element hit. It could uses memory
76   * {@link ByteBufferIOEngine} or file {@link FileIOEngine}to store/read the
77   * block data.
78   * 
79   * Eviction is using similar algorithm as
80   * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache}
81   * 
82   * BucketCache could be used as mainly a block cache(see
83   * {@link CombinedBlockCache}), combined with LruBlockCache to decrease CMS and
84   * fragment by GC.
85   * 
86   * Also could be used as a secondary cache(e.g. using Fusionio to store block)
87   * to enlarge cache space by
88   * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache#setVictimCache}
89   */
90  @InterfaceAudience.Private
91  public class BucketCache implements BlockCache, HeapSize {
92    static final Log LOG = LogFactory.getLog(BucketCache.class);
93  
94    /** Priority buckets */
95    private static final float DEFAULT_SINGLE_FACTOR = 0.25f;
96    private static final float DEFAULT_MULTI_FACTOR = 0.50f;
97    private static final float DEFAULT_MEMORY_FACTOR = 0.25f;
98    private static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f;
99  
100   private static final float DEFAULT_ACCEPT_FACTOR = 0.95f;
101   private static final float DEFAULT_MIN_FACTOR = 0.85f;
102 
103   /** Statistics thread */
104   private static final int statThreadPeriod = 3 * 60;
105 
106   final static int DEFAULT_WRITER_THREADS = 3;
107   final static int DEFAULT_WRITER_QUEUE_ITEMS = 64;
108 
109   // Store/read block data
110   IOEngine ioEngine;
111 
112   // Store the block in this map before writing it to cache
113   private ConcurrentHashMap<BlockCacheKey, RAMQueueEntry> ramCache;
114   // In this map, store the block's meta data like offset, length
115   private ConcurrentHashMap<BlockCacheKey, BucketEntry> backingMap;
116 
117   /**
118    * Flag if the cache is enabled or not... We shut it off if there are IO
119    * errors for some time, so that Bucket IO exceptions/errors don't bring down
120    * the HBase server.
121    */
122   private volatile boolean cacheEnabled;
123 
124   private ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues = 
125       new ArrayList<BlockingQueue<RAMQueueEntry>>();
126   WriterThread writerThreads[];
127 
128 
129 
130   /** Volatile boolean to track if free space is in process or not */
131   private volatile boolean freeInProgress = false;
132   private Lock freeSpaceLock = new ReentrantLock();
133 
134   private UniqueIndexMap<Integer> deserialiserMap = new UniqueIndexMap<Integer>();
135 
136   private final AtomicLong realCacheSize = new AtomicLong(0);
137   private final AtomicLong heapSize = new AtomicLong(0);
138   /** Current number of cached elements */
139   private final AtomicLong blockNumber = new AtomicLong(0);
140   private final AtomicLong failedBlockAdditions = new AtomicLong(0);
141 
142   /** Cache access count (sequential ID) */
143   private final AtomicLong accessCount = new AtomicLong(0);
144 
145   private final Object[] cacheWaitSignals;
146   private static final int DEFAULT_CACHE_WAIT_TIME = 50;
147   // Used in test now. If the flag is false and the cache speed is very fast,
148   // bucket cache will skip some blocks when caching. If the flag is true, we
149   // will wait blocks flushed to IOEngine for some time when caching
150   boolean wait_when_cache = false;
151 
152   private BucketCacheStats cacheStats = new BucketCacheStats();
153 
154   private String persistencePath;
155   private long cacheCapacity;
156   /** Approximate block size */
157   private final long blockSize;
158 
159   /** Duration of IO errors tolerated before we disable cache, 1 min as default */
160   private final int ioErrorsTolerationDuration;
161   // 1 min
162   public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000;
163   // Start time of first IO error when reading or writing IO Engine, it will be
164   // reset after a successful read/write.
165   private volatile long ioErrorStartTime = -1;
166 
167   /**
168    * A "sparse lock" implementation allowing to lock on a particular block
169    * identified by offset. The purpose of this is to avoid freeing the block
170    * which is being read.
171    * 
172    * TODO:We could extend the IdLock to IdReadWriteLock for better.
173    */
174   private IdLock offsetLock = new IdLock();
175 
176   private final ConcurrentIndex<String, BlockCacheKey> blocksByHFile =
177       new ConcurrentIndex<String, BlockCacheKey>(new Comparator<BlockCacheKey>() {
178         @Override
179         public int compare(BlockCacheKey a, BlockCacheKey b) {
180           if (a.getOffset() == b.getOffset()) {
181             return 0;
182           } else if (a.getOffset() < b.getOffset()) {
183             return -1;
184           }
185           return 1;
186         }
187       });
188 
189   /** Statistics thread schedule pool (for heavy debugging, could remove) */
190   private final ScheduledExecutorService scheduleThreadPool =
191     Executors.newScheduledThreadPool(1,
192       new ThreadFactoryBuilder()
193         .setNameFormat("BucketCache Statistics #%d")
194         .setDaemon(true)
195         .build());
196 
197   // Allocate or free space for the block
198   private BucketAllocator bucketAllocator;
199   
200   public BucketCache(String ioEngineName, long capacity, int blockSize, int writerThreadNum,
201       int writerQLen, String persistencePath) throws FileNotFoundException,
202       IOException {
203     this(ioEngineName, capacity, blockSize, writerThreadNum, writerQLen, persistencePath,
204         DEFAULT_ERROR_TOLERATION_DURATION);
205   }
206   
207   public BucketCache(String ioEngineName, long capacity, int blockSize, int writerThreadNum,
208       int writerQLen, String persistencePath, int ioErrorsTolerationDuration)
209       throws FileNotFoundException, IOException {
210     this.ioEngine = getIOEngineFromName(ioEngineName, capacity);
211     this.writerThreads = new WriterThread[writerThreadNum];
212     this.cacheWaitSignals = new Object[writerThreadNum];
213     long blockNumCapacity = capacity / blockSize;
214     if (blockNumCapacity >= Integer.MAX_VALUE) {
215       // Enough for about 32TB of cache!
216       throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now");
217     }
218 
219     this.cacheCapacity = capacity;
220     this.persistencePath = persistencePath;
221     this.blockSize = blockSize;
222     this.ioErrorsTolerationDuration = ioErrorsTolerationDuration;
223 
224     bucketAllocator = new BucketAllocator(capacity);
225     for (int i = 0; i < writerThreads.length; ++i) {
226       writerQueues.add(new ArrayBlockingQueue<RAMQueueEntry>(writerQLen));
227       this.cacheWaitSignals[i] = new Object();
228     }
229 
230     assert writerQueues.size() == writerThreads.length;
231     this.ramCache = new ConcurrentHashMap<BlockCacheKey, RAMQueueEntry>();
232 
233     this.backingMap = new ConcurrentHashMap<BlockCacheKey, BucketEntry>((int) blockNumCapacity);
234 
235     if (ioEngine.isPersistent() && persistencePath != null) {
236       try {
237         retrieveFromFile();
238       } catch (IOException ioex) {
239         LOG.error("Can't restore from file because of", ioex);
240       } catch (ClassNotFoundException cnfe) {
241         LOG.error("Can't restore from file in rebuild because can't deserialise",cnfe);
242         throw new RuntimeException(cnfe);
243       }
244     }
245     final String threadName = Thread.currentThread().getName();
246     this.cacheEnabled = true;
247     for (int i = 0; i < writerThreads.length; ++i) {
248       writerThreads[i] = new WriterThread(writerQueues.get(i), i);
249       writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i);
250       writerThreads[i].start();
251     }
252     // Run the statistics thread periodically to print the cache statistics log
253     this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
254         statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
255     LOG.info("Started bucket cache");
256   }
257 
258   /**
259    * Get the IOEngine from the IO engine name
260    * @param ioEngineName
261    * @param capacity
262    * @return the IOEngine
263    * @throws IOException
264    */
265   private IOEngine getIOEngineFromName(String ioEngineName, long capacity)
266       throws IOException {
267     if (ioEngineName.startsWith("file:"))
268       return new FileIOEngine(ioEngineName.substring(5), capacity);
269     else if (ioEngineName.startsWith("offheap"))
270       return new ByteBufferIOEngine(capacity, true);
271     else if (ioEngineName.startsWith("heap"))
272       return new ByteBufferIOEngine(capacity, false);
273     else
274       throw new IllegalArgumentException(
275           "Don't understand io engine name for cache - prefix with file:, heap or offheap");
276   }
277 
278   /**
279    * Cache the block with the specified name and buffer.
280    * @param cacheKey block's cache key
281    * @param buf block buffer
282    */
283   @Override
284   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
285     cacheBlock(cacheKey, buf, false);
286   }
287 
288   /**
289    * Cache the block with the specified name and buffer.
290    * @param cacheKey block's cache key
291    * @param cachedItem block buffer
292    * @param inMemory if block is in-memory
293    */
294   @Override
295   public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory) {
296     cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache);
297   }
298 
299   /**
300    * Cache the block to ramCache
301    * @param cacheKey block's cache key
302    * @param cachedItem block buffer
303    * @param inMemory if block is in-memory
304    * @param wait if true, blocking wait when queue is full
305    */
306   public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem,
307       boolean inMemory, boolean wait) {
308     if (!cacheEnabled)
309       return;
310 
311     if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey))
312       return;
313 
314     /*
315      * Stuff the entry into the RAM cache so it can get drained to the
316      * persistent store
317      */
318     RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem,
319         accessCount.incrementAndGet(), inMemory);
320     ramCache.put(cacheKey, re);
321     int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size();
322     BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum);
323     boolean successfulAddition = bq.offer(re);
324     if (!successfulAddition && wait) {
325       synchronized (cacheWaitSignals[queueNum]) {
326         try {
327           cacheWaitSignals[queueNum].wait(DEFAULT_CACHE_WAIT_TIME);
328         } catch (InterruptedException ie) {
329           Thread.currentThread().interrupt();
330         }
331       }
332       successfulAddition = bq.offer(re);
333     }
334     if (!successfulAddition) {
335         ramCache.remove(cacheKey);
336         failedBlockAdditions.incrementAndGet();
337     } else {
338       this.blockNumber.incrementAndGet();
339       this.heapSize.addAndGet(cachedItem.heapSize());
340       blocksByHFile.put(cacheKey.getHfileName(), cacheKey);
341     }
342   }
343 
344   /**
345    * Get the buffer of the block with the specified key.
346    * @param key block's cache key
347    * @param caching true if the caller caches blocks on cache misses
348    * @param repeat Whether this is a repeat lookup for the same block
349    * @return buffer of specified cache key, or null if not in cache
350    */
351   @Override
352   public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat) {
353     if (!cacheEnabled)
354       return null;
355     RAMQueueEntry re = ramCache.get(key);
356     if (re != null) {
357       cacheStats.hit(caching);
358       re.access(accessCount.incrementAndGet());
359       return re.getData();
360     }
361     BucketEntry bucketEntry = backingMap.get(key);
362     if(bucketEntry!=null) {
363       long start = System.nanoTime();
364       IdLock.Entry lockEntry = null;
365       try {
366         lockEntry = offsetLock.getLockEntry(bucketEntry.offset());
367         if (bucketEntry.equals(backingMap.get(key))) {
368           int len = bucketEntry.getLength();
369           ByteBuffer bb = ByteBuffer.allocate(len);
370           int lenRead = ioEngine.read(bb, bucketEntry.offset());
371           if (lenRead != len) {
372             throw new RuntimeException("Only " + lenRead + " bytes read, " + len + " expected");
373           }
374           Cacheable cachedBlock = bucketEntry.deserializerReference(
375               deserialiserMap).deserialize(bb, true);
376           long timeTaken = System.nanoTime() - start;
377           cacheStats.hit(caching);
378           cacheStats.ioHit(timeTaken);
379           bucketEntry.access(accessCount.incrementAndGet());
380           if (this.ioErrorStartTime > 0) {
381             ioErrorStartTime = -1;
382           }
383           return cachedBlock;
384         }
385       } catch (IOException ioex) {
386         LOG.error("Failed reading block " + key + " from bucket cache", ioex);
387         checkIOErrorIsTolerated();
388       } finally {
389         if (lockEntry != null) {
390           offsetLock.releaseLockEntry(lockEntry);
391         }
392       }
393     }
394     if(!repeat)cacheStats.miss(caching);
395     return null;
396   }
397 
398   @Override
399   public boolean evictBlock(BlockCacheKey cacheKey) {
400     if (!cacheEnabled) return false;
401     RAMQueueEntry removedBlock = ramCache.remove(cacheKey);
402     if (removedBlock != null) {
403       this.blockNumber.decrementAndGet();
404       this.heapSize.addAndGet(-1 * removedBlock.getData().heapSize());
405     }
406     BucketEntry bucketEntry = backingMap.get(cacheKey);
407     if (bucketEntry != null) {
408       IdLock.Entry lockEntry = null;
409       try {
410         lockEntry = offsetLock.getLockEntry(bucketEntry.offset());
411         if (bucketEntry.equals(backingMap.remove(cacheKey))) {
412           bucketAllocator.freeBlock(bucketEntry.offset());
413           realCacheSize.addAndGet(-1 * bucketEntry.getLength());
414           blocksByHFile.remove(cacheKey.getHfileName(), cacheKey);
415           if (removedBlock == null) {
416             this.blockNumber.decrementAndGet();
417           }
418         } else {
419           return false;
420         }
421       } catch (IOException ie) {
422         LOG.warn("Failed evicting block " + cacheKey);
423         return false;
424       } finally {
425         if (lockEntry != null) {
426           offsetLock.releaseLockEntry(lockEntry);
427         }
428       }
429     }
430     cacheStats.evicted();
431     return true;
432   }
433   
434   /*
435    * Statistics thread.  Periodically prints the cache statistics to the log.
436    */
437   private static class StatisticsThread extends Thread {
438     BucketCache bucketCache;
439 
440     public StatisticsThread(BucketCache bucketCache) {
441       super("BucketCache.StatisticsThread");
442       setDaemon(true);
443       this.bucketCache = bucketCache;
444     }
445     @Override
446     public void run() {
447       bucketCache.logStats();
448     }
449   }
450   
451   public void logStats() {
452     if (!LOG.isDebugEnabled()) return;
453     // Log size
454     long totalSize = bucketAllocator.getTotalSize();
455     long usedSize = bucketAllocator.getUsedSize();
456     long freeSize = totalSize - usedSize;
457     long cacheSize = this.realCacheSize.get();
458     LOG.debug("BucketCache Stats: " +
459         "failedBlockAdditions=" + this.failedBlockAdditions.get() + ", " +
460         "total=" + StringUtils.byteDesc(totalSize) + ", " +
461         "free=" + StringUtils.byteDesc(freeSize) + ", " +
462         "usedSize=" + StringUtils.byteDesc(usedSize) +", " +
463         "cacheSize=" + StringUtils.byteDesc(cacheSize) +", " +
464         "accesses=" + cacheStats.getRequestCount() + ", " +
465         "hits=" + cacheStats.getHitCount() + ", " +
466         "IOhitsPerSecond=" + cacheStats.getIOHitsPerSecond() + ", " +
467         "IOTimePerHit=" + String.format("%.2f", cacheStats.getIOTimePerHit())+ ", " +
468         "hitRatio=" + (cacheStats.getHitCount() == 0 ? "0," : 
469           (StringUtils.formatPercent(cacheStats.getHitRatio(), 2)+ ", ")) +
470         "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " +
471         "cachingHits=" + cacheStats.getHitCachingCount() + ", " +
472         "cachingHitsRatio=" +(cacheStats.getHitCachingCount() == 0 ? "0," : 
473           (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2)+ ", ")) +
474         "evictions=" + cacheStats.getEvictionCount() + ", " +
475         "evicted=" + cacheStats.getEvictedCount() + ", " +
476         "evictedPerRun=" + cacheStats.evictedPerEviction());
477     cacheStats.reset();
478   }
479 
480   private long acceptableSize() {
481     return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_ACCEPT_FACTOR);
482   }
483 
484   private long minSize() {
485     return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_MIN_FACTOR);
486   }
487 
488   private long singleSize() {
489     return (long) Math.floor(bucketAllocator.getTotalSize()
490         * DEFAULT_SINGLE_FACTOR * DEFAULT_MIN_FACTOR);
491   }
492 
493   private long multiSize() {
494     return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_MULTI_FACTOR
495         * DEFAULT_MIN_FACTOR);
496   }
497 
498   private long memorySize() {
499     return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_MEMORY_FACTOR
500         * DEFAULT_MIN_FACTOR);
501   }
502 
503   /**
504    * Free the space if the used size reaches acceptableSize() or one size block
505    * couldn't be allocated. When freeing the space, we use the LRU algorithm and
506    * ensure there must be some blocks evicted
507    */
508   private void freeSpace() {
509     // Ensure only one freeSpace progress at a time
510     if (!freeSpaceLock.tryLock()) return;
511     try {
512       freeInProgress = true;
513       long bytesToFreeWithoutExtra = 0;
514       /*
515        * Calculate free byte for each bucketSizeinfo
516        */
517       StringBuffer msgBuffer = new StringBuffer();
518       BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics();
519       long[] bytesToFreeForBucket = new long[stats.length];
520       for (int i = 0; i < stats.length; i++) {
521         bytesToFreeForBucket[i] = 0;
522         long freeGoal = (long) Math.floor(stats[i].totalCount()
523             * (1 - DEFAULT_MIN_FACTOR));
524         freeGoal = Math.max(freeGoal, 1);
525         if (stats[i].freeCount() < freeGoal) {
526           bytesToFreeForBucket[i] = stats[i].itemSize()
527           * (freeGoal - stats[i].freeCount());
528           bytesToFreeWithoutExtra += bytesToFreeForBucket[i];
529           msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")="
530               + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", ");
531         }
532       }
533       msgBuffer.append("Free for total="
534           + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", ");
535 
536       if (bytesToFreeWithoutExtra <= 0) {
537         return;
538       }
539       long currentSize = bucketAllocator.getUsedSize();
540       long totalSize=bucketAllocator.getTotalSize();
541       LOG.debug("Bucket cache free space started; Attempting to  " + msgBuffer.toString()
542           + " of current used=" + StringUtils.byteDesc(currentSize)
543           + ",actual cacheSize=" + StringUtils.byteDesc(realCacheSize.get())
544           + ",total=" + StringUtils.byteDesc(totalSize));
545       
546       long bytesToFreeWithExtra = (long) Math.floor(bytesToFreeWithoutExtra
547           * (1 + DEFAULT_EXTRA_FREE_FACTOR));
548 
549       // Instantiate priority buckets
550       BucketEntryGroup bucketSingle = new BucketEntryGroup(bytesToFreeWithExtra,
551           blockSize, singleSize());
552       BucketEntryGroup bucketMulti = new BucketEntryGroup(bytesToFreeWithExtra,
553           blockSize, multiSize());
554       BucketEntryGroup bucketMemory = new BucketEntryGroup(bytesToFreeWithExtra,
555           blockSize, memorySize());
556 
557       // Scan entire map putting bucket entry into appropriate bucket entry
558       // group
559       for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) {
560         switch (bucketEntryWithKey.getValue().getPriority()) {
561           case SINGLE: {
562             bucketSingle.add(bucketEntryWithKey);
563             break;
564           }
565           case MULTI: {
566             bucketMulti.add(bucketEntryWithKey);
567             break;
568           }
569           case MEMORY: {
570             bucketMemory.add(bucketEntryWithKey);
571             break;
572           }
573         }
574       }
575 
576       PriorityQueue<BucketEntryGroup> bucketQueue = new PriorityQueue<BucketEntryGroup>(3);
577 
578       bucketQueue.add(bucketSingle);
579       bucketQueue.add(bucketMulti);
580       bucketQueue.add(bucketMemory);
581 
582       int remainingBuckets = 3;
583       long bytesFreed = 0;
584 
585       BucketEntryGroup bucketGroup;
586       while ((bucketGroup = bucketQueue.poll()) != null) {
587         long overflow = bucketGroup.overflow();
588         if (overflow > 0) {
589           long bucketBytesToFree = Math.min(overflow,
590               (bytesToFreeWithoutExtra - bytesFreed) / remainingBuckets);
591           bytesFreed += bucketGroup.free(bucketBytesToFree);
592         }
593         remainingBuckets--;
594       }
595 
596       /**
597        * Check whether need extra free because some bucketSizeinfo still needs
598        * free space
599        */
600       stats = bucketAllocator.getIndexStatistics();
601       boolean needFreeForExtra = false;
602       for (int i = 0; i < stats.length; i++) {
603         long freeGoal = (long) Math.floor(stats[i].totalCount()
604             * (1 - DEFAULT_MIN_FACTOR));
605         freeGoal = Math.max(freeGoal, 1);
606         if (stats[i].freeCount() < freeGoal) {
607           needFreeForExtra = true;
608           break;
609         }
610       }
611 
612       if (needFreeForExtra) {
613         bucketQueue.clear();
614         remainingBuckets = 2;
615 
616         bucketQueue.add(bucketSingle);
617         bucketQueue.add(bucketMulti);
618 
619         while ((bucketGroup = bucketQueue.poll()) != null) {
620           long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed)
621               / remainingBuckets;
622           bytesFreed += bucketGroup.free(bucketBytesToFree);
623           remainingBuckets--;
624         }
625       }
626 
627       if (LOG.isDebugEnabled()) {
628         long single = bucketSingle.totalSize();
629         long multi = bucketMulti.totalSize();
630         long memory = bucketMemory.totalSize();
631         LOG.debug("Bucket cache free space completed; " + "freed="
632             + StringUtils.byteDesc(bytesFreed) + ", " + "total="
633             + StringUtils.byteDesc(totalSize) + ", " + "single="
634             + StringUtils.byteDesc(single) + ", " + "multi="
635             + StringUtils.byteDesc(multi) + ", " + "memory="
636             + StringUtils.byteDesc(memory));
637       }
638 
639     } finally {
640       cacheStats.evict();
641       freeInProgress = false;
642       freeSpaceLock.unlock();
643     }
644   }
645 
646   // This handles flushing the RAM cache to IOEngine.
647   private class WriterThread extends HasThread {
648     BlockingQueue<RAMQueueEntry> inputQueue;
649     final int threadNO;
650     boolean writerEnabled = true;
651 
652     WriterThread(BlockingQueue<RAMQueueEntry> queue, int threadNO) {
653       super();
654       this.inputQueue = queue;
655       this.threadNO = threadNO;
656       setDaemon(true);
657     }
658     
659     // Used for test
660     void disableWriter() {
661       this.writerEnabled = false;
662     }
663 
664     public void run() {
665       List<RAMQueueEntry> entries = new ArrayList<RAMQueueEntry>();
666       try {
667         while (cacheEnabled && writerEnabled) {
668           try {
669             // Blocks
670             entries.add(inputQueue.take());
671             inputQueue.drainTo(entries);
672             synchronized (cacheWaitSignals[threadNO]) {
673               cacheWaitSignals[threadNO].notifyAll();
674             }
675           } catch (InterruptedException ie) {
676             if (!cacheEnabled) break;
677           }
678           doDrain(entries);
679         }
680       } catch (Throwable t) {
681         LOG.warn("Failed doing drain", t);
682       }
683       LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled);
684     }
685 
686     /**
687      * Flush the entries in ramCache to IOEngine and add bucket entry to
688      * backingMap
689      * @param entries
690      * @throws InterruptedException
691      */
692     private void doDrain(List<RAMQueueEntry> entries)
693         throws InterruptedException {
694       BucketEntry[] bucketEntries = new BucketEntry[entries.size()];
695       RAMQueueEntry[] ramEntries = new RAMQueueEntry[entries.size()];
696       int done = 0;
697       while (entries.size() > 0 && cacheEnabled) {
698         // Keep going in case we throw...
699         RAMQueueEntry ramEntry = null;
700         try {
701           ramEntry = entries.remove(entries.size() - 1);
702           if (ramEntry == null) {
703             LOG.warn("Couldn't get the entry from RAM queue, who steals it?");
704             continue;
705           }
706           BucketEntry bucketEntry = ramEntry.writeToCache(ioEngine,
707               bucketAllocator, deserialiserMap, realCacheSize);
708           ramEntries[done] = ramEntry;
709           bucketEntries[done++] = bucketEntry;
710           if (ioErrorStartTime > 0) {
711             ioErrorStartTime = -1;
712           }
713         } catch (BucketAllocatorException fle) {
714           LOG.warn("Failed allocating for block "
715               + (ramEntry == null ? "" : ramEntry.getKey()), fle);
716         } catch (CacheFullException cfe) {
717           if (!freeInProgress) {
718             freeSpace();
719           } else {
720             Thread.sleep(50);
721           }
722         } catch (IOException ioex) {
723           LOG.error("Failed writing to bucket cache", ioex);
724           checkIOErrorIsTolerated();
725         }
726       }
727 
728       // Make sure that the data pages we have written are on the media before
729       // we update the map.
730       try {
731         ioEngine.sync();
732       } catch (IOException ioex) {
733         LOG.error("Faild syncing IO engine", ioex);
734         checkIOErrorIsTolerated();
735         // Since we failed sync, free the blocks in bucket allocator
736         for (int i = 0; i < done; ++i) {
737           if (bucketEntries[i] != null) {
738             bucketAllocator.freeBlock(bucketEntries[i].offset());
739           }
740         }
741         done = 0;
742       }
743 
744       for (int i = 0; i < done; ++i) {
745         if (bucketEntries[i] != null) {
746           backingMap.put(ramEntries[i].getKey(), bucketEntries[i]);
747         }
748         RAMQueueEntry ramCacheEntry = ramCache.remove(ramEntries[i].getKey());
749         if (ramCacheEntry != null) {
750           heapSize.addAndGet(-1 * ramEntries[i].getData().heapSize());
751         }
752       }
753 
754       if (bucketAllocator.getUsedSize() > acceptableSize()) {
755         freeSpace();
756       }
757     }
758   }
759 
760   
761 
762   private void persistToFile() throws IOException {
763     assert !cacheEnabled;
764     FileOutputStream fos = null;
765     ObjectOutputStream oos = null;
766     try {
767       if (!ioEngine.isPersistent())
768         throw new IOException(
769             "Attempt to persist non-persistent cache mappings!");
770       fos = new FileOutputStream(persistencePath, false);
771       oos = new ObjectOutputStream(fos);
772       oos.writeLong(cacheCapacity);
773       oos.writeUTF(ioEngine.getClass().getName());
774       oos.writeUTF(backingMap.getClass().getName());
775       oos.writeObject(deserialiserMap);
776       oos.writeObject(backingMap);
777     } finally {
778       if (oos != null) oos.close();
779       if (fos != null) fos.close();
780     }
781   }
782 
783   @SuppressWarnings("unchecked")
784   private void retrieveFromFile() throws IOException, BucketAllocatorException,
785       ClassNotFoundException {
786     File persistenceFile = new File(persistencePath);
787     if (!persistenceFile.exists()) {
788       return;
789     }
790     assert !cacheEnabled;
791     FileInputStream fis = null;
792     ObjectInputStream ois = null;
793     try {
794       if (!ioEngine.isPersistent())
795         throw new IOException(
796             "Attempt to restore non-persistent cache mappings!");
797       fis = new FileInputStream(persistencePath);
798       ois = new ObjectInputStream(fis);
799       long capacitySize = ois.readLong();
800       if (capacitySize != cacheCapacity)
801         throw new IOException("Mismatched cache capacity:"
802             + StringUtils.byteDesc(capacitySize) + ", expected: "
803             + StringUtils.byteDesc(cacheCapacity));
804       String ioclass = ois.readUTF();
805       String mapclass = ois.readUTF();
806       if (!ioEngine.getClass().getName().equals(ioclass))
807         throw new IOException("Class name for IO engine mismatch: " + ioclass
808             + ", expected:" + ioEngine.getClass().getName());
809       if (!backingMap.getClass().getName().equals(mapclass))
810         throw new IOException("Class name for cache map mismatch: " + mapclass
811             + ", expected:" + backingMap.getClass().getName());
812       UniqueIndexMap<Integer> deserMap = (UniqueIndexMap<Integer>) ois
813           .readObject();
814       BucketAllocator allocator = new BucketAllocator(cacheCapacity,
815           backingMap, this.realCacheSize);
816       backingMap = (ConcurrentHashMap<BlockCacheKey, BucketEntry>) ois
817           .readObject();
818       bucketAllocator = allocator;
819       deserialiserMap = deserMap;
820     } finally {
821       if (ois != null) ois.close();
822       if (fis != null) fis.close();
823       if (!persistenceFile.delete()) {
824         throw new IOException("Failed deleting persistence file "
825             + persistenceFile.getAbsolutePath());
826       }
827     }
828   }
829 
830   /**
831    * Check whether we tolerate IO error this time. If the duration of IOEngine
832    * throwing errors exceeds ioErrorsDurationTimeTolerated, we will disable the
833    * cache
834    */
835   private void checkIOErrorIsTolerated() {
836     long now = EnvironmentEdgeManager.currentTimeMillis();
837     if (this.ioErrorStartTime > 0) {
838       if (cacheEnabled
839           && (now - ioErrorStartTime) > this.ioErrorsTolerationDuration) {
840         LOG.error("IO errors duration time has exceeded "
841             + ioErrorsTolerationDuration
842             + "ms, disabing cache, please check your IOEngine");
843         disableCache();
844       }
845     } else {
846       this.ioErrorStartTime = now;
847     }
848   }
849 
850   /**
851    * Used to shut down the cache -or- turn it off in the case of something
852    * broken.
853    */
854   private void disableCache() {
855     if (!cacheEnabled)
856       return;
857     cacheEnabled = false;
858     ioEngine.shutdown();
859     this.scheduleThreadPool.shutdown();
860     for (int i = 0; i < writerThreads.length; ++i)
861       writerThreads[i].interrupt();
862     this.ramCache.clear();
863     if (!ioEngine.isPersistent() || persistencePath == null) {
864       this.backingMap.clear();
865     }
866   }
867 
868   private void join() throws InterruptedException {
869     for (int i = 0; i < writerThreads.length; ++i)
870       writerThreads[i].join();
871   }
872 
873   @Override
874   public void shutdown() {
875     disableCache();
876     LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent()
877         + "; path to write=" + persistencePath);
878     if (ioEngine.isPersistent() && persistencePath != null) {
879       try {
880         join();
881         persistToFile();
882       } catch (IOException ex) {
883         LOG.error("Unable to persist data on exit: " + ex.toString(), ex);
884       } catch (InterruptedException e) {
885         LOG.warn("Failed to persist data on exit", e);
886       }
887     }
888   }
889 
890   @Override
891   public CacheStats getStats() {
892     return cacheStats;
893   }
894 
895   BucketAllocator getAllocator() {
896     return this.bucketAllocator;
897   }
898 
899   @Override
900   public long heapSize() {
901     return this.heapSize.get();
902   }
903 
904   @Override
905   public long size() {
906     return this.realCacheSize.get();
907   }
908 
909   @Override
910   public long getFreeSize() {
911     return this.bucketAllocator.getFreeSize();
912   }
913 
914   @Override
915   public long getBlockCount() {
916     return this.blockNumber.get();
917   }
918 
919   @Override
920   public long getCurrentSize() {
921     return this.bucketAllocator.getUsedSize();
922   }
923 
924   @Override
925   public long getEvictedCount() {
926     return cacheStats.getEvictedCount();
927   }
928 
929   /**
930    * Evicts all blocks for a specific HFile.
931    * <p>
932    * This is used for evict-on-close to remove all blocks of a specific HFile.
933    *
934    * @return the number of blocks evicted
935    */
936   @Override
937   public int evictBlocksByHfileName(String hfileName) {
938     // Copy the list to avoid ConcurrentModificationException
939     // as evictBlockKey removes the key from the index
940     Set<BlockCacheKey> keySet = blocksByHFile.values(hfileName);
941     if (keySet == null) {
942       return 0;
943     }
944     int numEvicted = 0;
945     List<BlockCacheKey> keysForHFile = ImmutableList.copyOf(keySet);
946     for (BlockCacheKey key : keysForHFile) {
947       if (evictBlock(key)) {
948           ++numEvicted;
949       }
950     }
951     
952     return numEvicted;
953   }
954 
955 
956   @Override
957   public List<BlockCacheColumnFamilySummary> getBlockCacheColumnFamilySummaries(
958       Configuration conf) {
959     throw new UnsupportedOperationException();
960   }
961 
962   static enum BlockPriority {
963     /**
964      * Accessed a single time (used for scan-resistance)
965      */
966     SINGLE,
967     /**
968      * Accessed multiple times
969      */
970     MULTI,
971     /**
972      * Block from in-memory store
973      */
974     MEMORY
975   };
976 
977   /**
978    * Item in cache. We expect this to be where most memory goes. Java uses 8
979    * bytes just for object headers; after this, we want to use as little as
980    * possible - so we only use 8 bytes, but in order to do so we end up messing
981    * around with all this Java casting stuff. Offset stored as 5 bytes that make
982    * up the long. Doubt we'll see devices this big for ages. Offsets are divided
983    * by 256. So 5 bytes gives us 256TB or so.
984    */
985   static class BucketEntry implements Serializable, Comparable<BucketEntry> {
986     private static final long serialVersionUID = -6741504807982257534L;
987     private int offsetBase;
988     private int length;
989     private byte offset1;
990     byte deserialiserIndex;
991     private volatile long accessTime;
992     private BlockPriority priority;
993 
994     BucketEntry(long offset, int length, long accessTime, boolean inMemory) {
995       setOffset(offset);
996       this.length = length;
997       this.accessTime = accessTime;
998       if (inMemory) {
999         this.priority = BlockPriority.MEMORY;
1000       } else {
1001         this.priority = BlockPriority.SINGLE;
1002       }
1003     }
1004 
1005     long offset() { // Java has no unsigned numbers
1006       long o = ((long) offsetBase) & 0xFFFFFFFF;
1007       o += (((long) (offset1)) & 0xFF) << 32;
1008       return o << 8;
1009     }
1010 
1011     private void setOffset(long value) {
1012       assert (value & 0xFF) == 0;
1013       value >>= 8;
1014       offsetBase = (int) value;
1015       offset1 = (byte) (value >> 32);
1016     }
1017 
1018     public int getLength() {
1019       return length;
1020     }
1021 
1022     protected CacheableDeserializer<Cacheable> deserializerReference(
1023         UniqueIndexMap<Integer> deserialiserMap) {
1024       return CacheableDeserializerIdManager.getDeserializer(deserialiserMap
1025           .unmap(deserialiserIndex));
1026     }
1027 
1028     protected void setDeserialiserReference(
1029         CacheableDeserializer<Cacheable> deserializer,
1030         UniqueIndexMap<Integer> deserialiserMap) {
1031       this.deserialiserIndex = ((byte) deserialiserMap.map(deserializer
1032           .getDeserialiserIdentifier()));
1033     }
1034 
1035     /**
1036      * Block has been accessed. Update its local access time.
1037      */
1038     public void access(long accessTime) {
1039       this.accessTime = accessTime;
1040       if (this.priority == BlockPriority.SINGLE) {
1041         this.priority = BlockPriority.MULTI;
1042       }
1043     }
1044     
1045     public BlockPriority getPriority() {
1046       return this.priority;
1047     }
1048 
1049     @Override
1050     public int compareTo(BucketEntry that) {
1051       if(this.accessTime == that.accessTime) return 0;
1052       return this.accessTime < that.accessTime ? 1 : -1;
1053     }
1054 
1055     @Override
1056     public boolean equals(Object that) {
1057       return this == that;
1058     }
1059   }
1060 
1061   /**
1062    * Used to group bucket entries into priority buckets. There will be a
1063    * BucketEntryGroup for each priority (single, multi, memory). Once bucketed,
1064    * the eviction algorithm takes the appropriate number of elements out of each
1065    * according to configuration parameters and their relative sizes.
1066    */
1067   private class BucketEntryGroup implements Comparable<BucketEntryGroup> {
1068 
1069     private CachedEntryQueue queue;
1070     private long totalSize = 0;
1071     private long bucketSize;
1072 
1073     public BucketEntryGroup(long bytesToFree, long blockSize, long bucketSize) {
1074       this.bucketSize = bucketSize;
1075       queue = new CachedEntryQueue(bytesToFree, blockSize);
1076       totalSize = 0;
1077     }
1078 
1079     public void add(Map.Entry<BlockCacheKey, BucketEntry> block) {
1080       totalSize += block.getValue().getLength();
1081       queue.add(block);
1082     }
1083 
1084     public long free(long toFree) {
1085       Map.Entry<BlockCacheKey, BucketEntry> entry;
1086       long freedBytes = 0;
1087       while ((entry = queue.pollLast()) != null) {
1088         evictBlock(entry.getKey());
1089         freedBytes += entry.getValue().getLength();
1090         if (freedBytes >= toFree) {
1091           return freedBytes;
1092         }
1093       }
1094       return freedBytes;
1095     }
1096 
1097     public long overflow() {
1098       return totalSize - bucketSize;
1099     }
1100 
1101     public long totalSize() {
1102       return totalSize;
1103     }
1104 
1105     @Override
1106     public int compareTo(BucketEntryGroup that) {
1107       if (this.overflow() == that.overflow())
1108         return 0;
1109       return this.overflow() > that.overflow() ? 1 : -1;
1110     }
1111 
1112     @Override
1113     public boolean equals(Object that) {
1114       return this == that;
1115     }
1116 
1117   }
1118 
1119   /**
1120    * Block Entry stored in the memory with key,data and so on
1121    */
1122   private static class RAMQueueEntry {
1123     private BlockCacheKey key;
1124     private Cacheable data;
1125     private long accessTime;
1126     private boolean inMemory;
1127 
1128     public RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessTime,
1129         boolean inMemory) {
1130       this.key = bck;
1131       this.data = data;
1132       this.accessTime = accessTime;
1133       this.inMemory = inMemory;
1134     }
1135 
1136     public Cacheable getData() {
1137       return data;
1138     }
1139 
1140     public BlockCacheKey getKey() {
1141       return key;
1142     }
1143 
1144     public void access(long accessTime) {
1145       this.accessTime = accessTime;
1146     }
1147 
1148     public BucketEntry writeToCache(final IOEngine ioEngine,
1149         final BucketAllocator bucketAllocator,
1150         final UniqueIndexMap<Integer> deserialiserMap,
1151         final AtomicLong realCacheSize) throws CacheFullException, IOException,
1152         BucketAllocatorException {
1153       int len = data.getSerializedLength();
1154       // This cacheable thing can't be serialized...
1155       if (len == 0) return null;
1156       long offset = bucketAllocator.allocateBlock(len);
1157       BucketEntry bucketEntry = new BucketEntry(offset, len, accessTime,
1158           inMemory);
1159       bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap);
1160       try {
1161         if (data instanceof HFileBlock) {
1162           ByteBuffer sliceBuf = ((HFileBlock) data).getBufferReadOnlyWithHeader();
1163           sliceBuf.rewind();
1164           assert len == sliceBuf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE;
1165           ByteBuffer extraInfoBuffer = ByteBuffer.allocate(HFileBlock.EXTRA_SERIALIZATION_SPACE);
1166           ((HFileBlock) data).serializeExtraInfo(extraInfoBuffer);
1167           ioEngine.write(sliceBuf, offset);
1168           ioEngine.write(extraInfoBuffer, offset + len - HFileBlock.EXTRA_SERIALIZATION_SPACE);
1169         } else {
1170           ByteBuffer bb = ByteBuffer.allocate(len);
1171           data.serialize(bb);
1172           ioEngine.write(bb, offset);
1173         }
1174       } catch (IOException ioe) {
1175         // free it in bucket allocator
1176         bucketAllocator.freeBlock(offset);
1177         throw ioe;
1178       }
1179       
1180       realCacheSize.addAndGet(len);
1181       return bucketEntry;
1182     }
1183   }
1184 
1185   /**
1186    * Only used in test
1187    * @throws InterruptedException
1188    */
1189   void stopWriterThreads() throws InterruptedException {
1190     for (WriterThread writerThread : writerThreads) {
1191       writerThread.disableWriter();
1192       writerThread.interrupt();
1193       writerThread.join();
1194     }
1195   }
1196 
1197 }