View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   
16   * distributed under the License is distributed on an "AS IS" BASIS,
17   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18   * See the License for the specific language governing permissions and
19   * limitations under the License.
20   */
21  package org.apache.hadoop.hbase.io.hfile.bucket;
22  
23  import java.io.File;
24  import java.io.FileInputStream;
25  import java.io.FileNotFoundException;
26  import java.io.FileOutputStream;
27  import java.io.IOException;
28  import java.io.ObjectInputStream;
29  import java.io.ObjectOutputStream;
30  import java.io.Serializable;
31  import java.nio.ByteBuffer;
32  import java.util.ArrayList;
33  import java.util.Comparator;
34  import java.util.Iterator;
35  import java.util.List;
36  import java.util.Map;
37  import java.util.PriorityQueue;
38  import java.util.Set;
39  import java.util.concurrent.ArrayBlockingQueue;
40  import java.util.concurrent.BlockingQueue;
41  import java.util.concurrent.ConcurrentHashMap;
42  import java.util.concurrent.Executors;
43  import java.util.concurrent.ScheduledExecutorService;
44  import java.util.concurrent.TimeUnit;
45  import java.util.concurrent.atomic.AtomicLong;
46  import java.util.concurrent.locks.Lock;
47  import java.util.concurrent.locks.ReentrantLock;
48  
49  import org.apache.commons.logging.Log;
50  import org.apache.commons.logging.LogFactory;
51  import org.apache.hadoop.classification.InterfaceAudience;
52  import org.apache.hadoop.hbase.io.HeapSize;
53  import org.apache.hadoop.hbase.io.hfile.BlockCache;
54  import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
55  import org.apache.hadoop.hbase.io.hfile.BlockPriority;
56  import org.apache.hadoop.hbase.io.hfile.BlockType;
57  import org.apache.hadoop.hbase.io.hfile.CacheStats;
58  import org.apache.hadoop.hbase.io.hfile.Cacheable;
59  import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
60  import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
61  import org.apache.hadoop.hbase.io.hfile.CachedBlock;
62  import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil;
63  import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
64  import org.apache.hadoop.hbase.io.hfile.HFileBlock;
65  import org.apache.hadoop.hbase.util.ConcurrentIndex;
66  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
67  import org.apache.hadoop.hbase.util.HasThread;
68  import org.apache.hadoop.hbase.util.IdLock;
69  import org.apache.hadoop.util.StringUtils;
70  
71  import com.google.common.collect.ImmutableList;
72  import com.google.common.util.concurrent.ThreadFactoryBuilder;
73  
74  /**
75   * BucketCache uses {@link BucketAllocator} to allocate/free blocks, and uses
76   * {@link BucketCache#ramCache} and {@link BucketCache#backingMap} in order to
77   * determine if a given element is in the cache. The bucket cache can use on-heap or
78   * off-heap memory {@link ByteBufferIOEngine} or in a file {@link FileIOEngine} to
79   * store/read the block data.
80   * 
81   * <p>Eviction is via a similar algorithm as used in
82   * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache}
83   * 
84   * <p>BucketCache can be used as mainly a block cache (see
85   * {@link CombinedBlockCache}), combined with LruBlockCache to decrease CMS GC and
86   * heap fragmentation.
87   * 
88   * <p>It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store
89   * blocks) to enlarge cache space via
90   * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache#setVictimCache}
91   */
92  @InterfaceAudience.Private
93  public class BucketCache implements BlockCache, HeapSize {
94    static final Log LOG = LogFactory.getLog(BucketCache.class);
95  
96    /** Priority buckets */
97    private static final float DEFAULT_SINGLE_FACTOR = 0.25f;
98    private static final float DEFAULT_MULTI_FACTOR = 0.50f;
99    private static final float DEFAULT_MEMORY_FACTOR = 0.25f;
100   private static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f;
101 
102   private static final float DEFAULT_ACCEPT_FACTOR = 0.95f;
103   private static final float DEFAULT_MIN_FACTOR = 0.85f;
104 
105   /** Statistics thread */
106   private static final int statThreadPeriod = 5 * 60;
107 
108   final static int DEFAULT_WRITER_THREADS = 3;
109   final static int DEFAULT_WRITER_QUEUE_ITEMS = 64;
110 
111   // Store/read block data
112   IOEngine ioEngine;
113 
114   // Store the block in this map before writing it to cache
115   private Map<BlockCacheKey, RAMQueueEntry> ramCache;
116   // In this map, store the block's meta data like offset, length
117   private Map<BlockCacheKey, BucketEntry> backingMap;
118 
119   /**
120    * Flag if the cache is enabled or not... We shut it off if there are IO
121    * errors for some time, so that Bucket IO exceptions/errors don't bring down
122    * the HBase server.
123    */
124   private volatile boolean cacheEnabled;
125 
126   private ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues = 
127       new ArrayList<BlockingQueue<RAMQueueEntry>>();
128   WriterThread writerThreads[];
129 
130   /** Volatile boolean to track if free space is in process or not */
131   private volatile boolean freeInProgress = false;
132   private Lock freeSpaceLock = new ReentrantLock();
133 
134   private UniqueIndexMap<Integer> deserialiserMap = new UniqueIndexMap<Integer>();
135 
136   private final AtomicLong realCacheSize = new AtomicLong(0);
137   private final AtomicLong heapSize = new AtomicLong(0);
138   /** Current number of cached elements */
139   private final AtomicLong blockNumber = new AtomicLong(0);
140   private final AtomicLong failedBlockAdditions = new AtomicLong(0);
141 
142   /** Cache access count (sequential ID) */
143   private final AtomicLong accessCount = new AtomicLong(0);
144 
145   private final Object[] cacheWaitSignals;
146   private static final int DEFAULT_CACHE_WAIT_TIME = 50;
147   // Used in test now. If the flag is false and the cache speed is very fast,
148   // bucket cache will skip some blocks when caching. If the flag is true, we
149   // will wait blocks flushed to IOEngine for some time when caching
150   boolean wait_when_cache = false;
151 
152   private BucketCacheStats cacheStats = new BucketCacheStats();
153 
154   private String persistencePath;
155   private long cacheCapacity;
156   /** Approximate block size */
157   private final long blockSize;
158   private final int[] bucketSizes;
159 
160   /** Duration of IO errors tolerated before we disable cache, 1 min as default */
161   private final int ioErrorsTolerationDuration;
162   // 1 min
163   public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000;
164   // Start time of first IO error when reading or writing IO Engine, it will be
165   // reset after a successful read/write.
166   private volatile long ioErrorStartTime = -1;
167 
168   /**
169    * A "sparse lock" implementation allowing to lock on a particular block
170    * identified by offset. The purpose of this is to avoid freeing the block
171    * which is being read.
172    * 
173    * TODO:We could extend the IdLock to IdReadWriteLock for better.
174    */
175   private IdLock offsetLock = new IdLock();
176 
177   private final ConcurrentIndex<String, BlockCacheKey> blocksByHFile =
178       new ConcurrentIndex<String, BlockCacheKey>(new Comparator<BlockCacheKey>() {
179         @Override
180         public int compare(BlockCacheKey a, BlockCacheKey b) {
181           if (a.getOffset() == b.getOffset()) {
182             return 0;
183           } else if (a.getOffset() < b.getOffset()) {
184             return -1;
185           }
186           return 1;
187         }
188       });
189 
190   /** Statistics thread schedule pool (for heavy debugging, could remove) */
191   private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1,
192     new ThreadFactoryBuilder().setNameFormat("BucketCacheStatsExecutor").setDaemon(true).build());
193 
194   // Allocate or free space for the block
195   private BucketAllocator bucketAllocator;
196 
197   public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
198       int writerThreadNum, int writerQLen, String persistencePath) throws FileNotFoundException,
199       IOException {
200     this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
201       persistencePath, DEFAULT_ERROR_TOLERATION_DURATION);
202   }
203   
204   public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
205       int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration)
206       throws FileNotFoundException, IOException {
207     this.ioEngine = getIOEngineFromName(ioEngineName, capacity);
208     this.writerThreads = new WriterThread[writerThreadNum];
209     this.cacheWaitSignals = new Object[writerThreadNum];
210     long blockNumCapacity = capacity / blockSize;
211     if (blockNumCapacity >= Integer.MAX_VALUE) {
212       // Enough for about 32TB of cache!
213       throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now");
214     }
215 
216     this.cacheCapacity = capacity;
217     this.persistencePath = persistencePath;
218     this.blockSize = blockSize;
219     this.bucketSizes = bucketSizes;
220     this.ioErrorsTolerationDuration = ioErrorsTolerationDuration;
221 
222     bucketAllocator = new BucketAllocator(capacity, bucketSizes);
223     for (int i = 0; i < writerThreads.length; ++i) {
224       writerQueues.add(new ArrayBlockingQueue<RAMQueueEntry>(writerQLen));
225       this.cacheWaitSignals[i] = new Object();
226     }
227 
228     assert writerQueues.size() == writerThreads.length;
229     this.ramCache = new ConcurrentHashMap<BlockCacheKey, RAMQueueEntry>();
230 
231     this.backingMap = new ConcurrentHashMap<BlockCacheKey, BucketEntry>((int) blockNumCapacity);
232 
233     if (ioEngine.isPersistent() && persistencePath != null) {
234       try {
235         retrieveFromFile();
236       } catch (IOException ioex) {
237         LOG.error("Can't restore from file because of", ioex);
238       } catch (ClassNotFoundException cnfe) {
239         LOG.error("Can't restore from file in rebuild because can't deserialise",cnfe);
240         throw new RuntimeException(cnfe);
241       }
242     }
243     final String threadName = Thread.currentThread().getName();
244     this.cacheEnabled = true;
245     for (int i = 0; i < writerThreads.length; ++i) {
246       writerThreads[i] = new WriterThread(writerQueues.get(i), i);
247       writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i);
248       writerThreads[i].start();
249     }
250     // Run the statistics thread periodically to print the cache statistics log
251     // TODO: Add means of turning this off.  Bit obnoxious running thread just to make a log
252     // every five minutes.
253     this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
254         statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
255     LOG.info("Started bucket cache; ioengine=" + ioEngineName +
256         ", capacity=" + StringUtils.byteDesc(capacity) +
257       ", blockSize=" + StringUtils.byteDesc(blockSize) + ", writerThreadNum=" +
258         writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath=" +
259       persistencePath + ", bucketAllocator=" + this.bucketAllocator);
260   }
261 
262   public String getIoEngine() {
263     return ioEngine.toString();
264   }
265 
266   /**
267    * Get the IOEngine from the IO engine name
268    * @param ioEngineName
269    * @param capacity
270    * @return the IOEngine
271    * @throws IOException
272    */
273   private IOEngine getIOEngineFromName(String ioEngineName, long capacity)
274       throws IOException {
275     if (ioEngineName.startsWith("file:"))
276       return new FileIOEngine(ioEngineName.substring(5), capacity);
277     else if (ioEngineName.startsWith("offheap"))
278       return new ByteBufferIOEngine(capacity, true);
279     else if (ioEngineName.startsWith("heap"))
280       return new ByteBufferIOEngine(capacity, false);
281     else
282       throw new IllegalArgumentException(
283           "Don't understand io engine name for cache - prefix with file:, heap or offheap");
284   }
285 
286   /**
287    * Cache the block with the specified name and buffer.
288    * @param cacheKey block's cache key
289    * @param buf block buffer
290    */
291   @Override
292   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
293     cacheBlock(cacheKey, buf, false, false);
294   }
295 
296   /**
297    * Cache the block with the specified name and buffer.
298    * @param cacheKey block's cache key
299    * @param cachedItem block buffer
300    * @param inMemory if block is in-memory
301    * @param cacheDataInL1
302    */
303   @Override
304   public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
305       final boolean cacheDataInL1) {
306     cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache);
307   }
308 
309   /**
310    * Cache the block to ramCache
311    * @param cacheKey block's cache key
312    * @param cachedItem block buffer
313    * @param inMemory if block is in-memory
314    * @param wait if true, blocking wait when queue is full
315    */
316   public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem,
317       boolean inMemory, boolean wait) {
318     if (!cacheEnabled)
319       return;
320 
321     if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey))
322       return;
323 
324     /*
325      * Stuff the entry into the RAM cache so it can get drained to the
326      * persistent store
327      */
328     RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem,
329         accessCount.incrementAndGet(), inMemory);
330     ramCache.put(cacheKey, re);
331     int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size();
332     BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum);
333     boolean successfulAddition = bq.offer(re);
334     if (!successfulAddition && wait) {
335       synchronized (cacheWaitSignals[queueNum]) {
336         try {
337           successfulAddition = bq.offer(re);
338           if (!successfulAddition) cacheWaitSignals[queueNum].wait(DEFAULT_CACHE_WAIT_TIME);
339         } catch (InterruptedException ie) {
340           Thread.currentThread().interrupt();
341         }
342       }
343       successfulAddition = bq.offer(re);
344     }
345     if (!successfulAddition) {
346         ramCache.remove(cacheKey);
347         failedBlockAdditions.incrementAndGet();
348     } else {
349       this.blockNumber.incrementAndGet();
350       this.heapSize.addAndGet(cachedItem.heapSize());
351       blocksByHFile.put(cacheKey.getHfileName(), cacheKey);
352     }
353   }
354 
355   /**
356    * Get the buffer of the block with the specified key.
357    * @param key block's cache key
358    * @param caching true if the caller caches blocks on cache misses
359    * @param repeat Whether this is a repeat lookup for the same block
360    * @param updateCacheMetrics Whether we should update cache metrics or not
361    * @return buffer of specified cache key, or null if not in cache
362    */
363   @Override
364   public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
365       boolean updateCacheMetrics) {
366     if (!cacheEnabled)
367       return null;
368     RAMQueueEntry re = ramCache.get(key);
369     if (re != null) {
370       if (updateCacheMetrics) cacheStats.hit(caching);
371       re.access(accessCount.incrementAndGet());
372       return re.getData();
373     }
374     BucketEntry bucketEntry = backingMap.get(key);
375     if (bucketEntry != null) {
376       long start = System.nanoTime();
377       IdLock.Entry lockEntry = null;
378       try {
379         lockEntry = offsetLock.getLockEntry(bucketEntry.offset());
380         if (bucketEntry.equals(backingMap.get(key))) {
381           int len = bucketEntry.getLength();
382           ByteBuffer bb = ByteBuffer.allocate(len);
383           int lenRead = ioEngine.read(bb, bucketEntry.offset());
384           if (lenRead != len) {
385             throw new RuntimeException("Only " + lenRead + " bytes read, " + len + " expected");
386           }
387           CacheableDeserializer<Cacheable> deserializer =
388             bucketEntry.deserializerReference(this.deserialiserMap);
389           Cacheable cachedBlock = deserializer.deserialize(bb, true);
390           long timeTaken = System.nanoTime() - start;
391           if (updateCacheMetrics) {
392             cacheStats.hit(caching);
393             cacheStats.ioHit(timeTaken);
394           }
395           bucketEntry.access(accessCount.incrementAndGet());
396           if (this.ioErrorStartTime > 0) {
397             ioErrorStartTime = -1;
398           }
399           return cachedBlock;
400         }
401       } catch (IOException ioex) {
402         LOG.error("Failed reading block " + key + " from bucket cache", ioex);
403         checkIOErrorIsTolerated();
404       } finally {
405         if (lockEntry != null) {
406           offsetLock.releaseLockEntry(lockEntry);
407         }
408       }
409     }
410     if (!repeat && updateCacheMetrics) cacheStats.miss(caching);
411     return null;
412   }
413 
414   @Override
415   public boolean evictBlock(BlockCacheKey cacheKey) {
416     if (!cacheEnabled) return false;
417     RAMQueueEntry removedBlock = ramCache.remove(cacheKey);
418     if (removedBlock != null) {
419       this.blockNumber.decrementAndGet();
420       this.heapSize.addAndGet(-1 * removedBlock.getData().heapSize());
421     }
422     BucketEntry bucketEntry = backingMap.get(cacheKey);
423     if (bucketEntry != null) {
424       IdLock.Entry lockEntry = null;
425       try {
426         lockEntry = offsetLock.getLockEntry(bucketEntry.offset());
427         if (bucketEntry.equals(backingMap.remove(cacheKey))) {
428           bucketAllocator.freeBlock(bucketEntry.offset());
429           realCacheSize.addAndGet(-1 * bucketEntry.getLength());
430           blocksByHFile.remove(cacheKey.getHfileName(), cacheKey);
431           if (removedBlock == null) {
432             this.blockNumber.decrementAndGet();
433           }
434         } else {
435           return false;
436         }
437       } catch (IOException ie) {
438         LOG.warn("Failed evicting block " + cacheKey);
439         return false;
440       } finally {
441         if (lockEntry != null) {
442           offsetLock.releaseLockEntry(lockEntry);
443         }
444       }
445     }
446     cacheStats.evicted();
447     return true;
448   }
449 
450   /*
451    * Statistics thread.  Periodically output cache statistics to the log.
452    */
453   private static class StatisticsThread extends Thread {
454     private final BucketCache bucketCache;
455 
456     public StatisticsThread(BucketCache bucketCache) {
457       super("BucketCacheStatsThread");
458       setDaemon(true);
459       this.bucketCache = bucketCache;
460     }
461 
462     @Override
463     public void run() {
464       bucketCache.logStats();
465     }
466   }
467 
468   public void logStats() {
469     long totalSize = bucketAllocator.getTotalSize();
470     long usedSize = bucketAllocator.getUsedSize();
471     long freeSize = totalSize - usedSize;
472     long cacheSize = getRealCacheSize();
473     LOG.info("failedBlockAdditions=" + getFailedBlockAdditions() + ", " +
474         "totalSize=" + StringUtils.byteDesc(totalSize) + ", " +
475         "freeSize=" + StringUtils.byteDesc(freeSize) + ", " +
476         "usedSize=" + StringUtils.byteDesc(usedSize) +", " +
477         "cacheSize=" + StringUtils.byteDesc(cacheSize) +", " +
478         "accesses=" + cacheStats.getRequestCount() + ", " +
479         "hits=" + cacheStats.getHitCount() + ", " +
480         "IOhitsPerSecond=" + cacheStats.getIOHitsPerSecond() + ", " +
481         "IOTimePerHit=" + String.format("%.2f", cacheStats.getIOTimePerHit())+ ", " +
482         "hitRatio=" + (cacheStats.getHitCount() == 0 ? "0," : 
483           (StringUtils.formatPercent(cacheStats.getHitRatio(), 2)+ ", ")) +
484         "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " +
485         "cachingHits=" + cacheStats.getHitCachingCount() + ", " +
486         "cachingHitsRatio=" +(cacheStats.getHitCachingCount() == 0 ? "0," : 
487           (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2)+ ", ")) +
488         "evictions=" + cacheStats.getEvictionCount() + ", " +
489         "evicted=" + cacheStats.getEvictedCount() + ", " +
490         "evictedPerRun=" + cacheStats.evictedPerEviction());
491     cacheStats.reset();
492   }
493 
494   public long getFailedBlockAdditions() {
495     return this.failedBlockAdditions.get();
496   }
497 
498   public long getRealCacheSize() {
499     return this.realCacheSize.get();
500   }
501 
502   private long acceptableSize() {
503     return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_ACCEPT_FACTOR);
504   }
505 
506   private long singleSize() {
507     return (long) Math.floor(bucketAllocator.getTotalSize()
508         * DEFAULT_SINGLE_FACTOR * DEFAULT_MIN_FACTOR);
509   }
510 
511   private long multiSize() {
512     return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_MULTI_FACTOR
513         * DEFAULT_MIN_FACTOR);
514   }
515 
516   private long memorySize() {
517     return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_MEMORY_FACTOR
518         * DEFAULT_MIN_FACTOR);
519   }
520 
521   /**
522    * Free the space if the used size reaches acceptableSize() or one size block
523    * couldn't be allocated. When freeing the space, we use the LRU algorithm and
524    * ensure there must be some blocks evicted
525    */
526   private void freeSpace() {
527     // Ensure only one freeSpace progress at a time
528     if (!freeSpaceLock.tryLock()) return;
529     try {
530       freeInProgress = true;
531       long bytesToFreeWithoutExtra = 0;
532       /*
533        * Calculate free byte for each bucketSizeinfo
534        */
535       StringBuffer msgBuffer = new StringBuffer();
536       BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics();
537       long[] bytesToFreeForBucket = new long[stats.length];
538       for (int i = 0; i < stats.length; i++) {
539         bytesToFreeForBucket[i] = 0;
540         long freeGoal = (long) Math.floor(stats[i].totalCount()
541             * (1 - DEFAULT_MIN_FACTOR));
542         freeGoal = Math.max(freeGoal, 1);
543         if (stats[i].freeCount() < freeGoal) {
544           bytesToFreeForBucket[i] = stats[i].itemSize()
545           * (freeGoal - stats[i].freeCount());
546           bytesToFreeWithoutExtra += bytesToFreeForBucket[i];
547           msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")="
548               + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", ");
549         }
550       }
551       msgBuffer.append("Free for total="
552           + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", ");
553 
554       if (bytesToFreeWithoutExtra <= 0) {
555         return;
556       }
557       long currentSize = bucketAllocator.getUsedSize();
558       long totalSize=bucketAllocator.getTotalSize();
559       LOG.debug("Bucket cache free space started; Attempting to  " + msgBuffer.toString()
560           + " of current used=" + StringUtils.byteDesc(currentSize)
561           + ",actual cacheSize=" + StringUtils.byteDesc(realCacheSize.get())
562           + ",total=" + StringUtils.byteDesc(totalSize));
563       
564       long bytesToFreeWithExtra = (long) Math.floor(bytesToFreeWithoutExtra
565           * (1 + DEFAULT_EXTRA_FREE_FACTOR));
566 
567       // Instantiate priority buckets
568       BucketEntryGroup bucketSingle = new BucketEntryGroup(bytesToFreeWithExtra,
569           blockSize, singleSize());
570       BucketEntryGroup bucketMulti = new BucketEntryGroup(bytesToFreeWithExtra,
571           blockSize, multiSize());
572       BucketEntryGroup bucketMemory = new BucketEntryGroup(bytesToFreeWithExtra,
573           blockSize, memorySize());
574 
575       // Scan entire map putting bucket entry into appropriate bucket entry
576       // group
577       for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) {
578         switch (bucketEntryWithKey.getValue().getPriority()) {
579           case SINGLE: {
580             bucketSingle.add(bucketEntryWithKey);
581             break;
582           }
583           case MULTI: {
584             bucketMulti.add(bucketEntryWithKey);
585             break;
586           }
587           case MEMORY: {
588             bucketMemory.add(bucketEntryWithKey);
589             break;
590           }
591         }
592       }
593 
594       PriorityQueue<BucketEntryGroup> bucketQueue = new PriorityQueue<BucketEntryGroup>(3);
595 
596       bucketQueue.add(bucketSingle);
597       bucketQueue.add(bucketMulti);
598       bucketQueue.add(bucketMemory);
599 
600       int remainingBuckets = 3;
601       long bytesFreed = 0;
602 
603       BucketEntryGroup bucketGroup;
604       while ((bucketGroup = bucketQueue.poll()) != null) {
605         long overflow = bucketGroup.overflow();
606         if (overflow > 0) {
607           long bucketBytesToFree = Math.min(overflow,
608               (bytesToFreeWithoutExtra - bytesFreed) / remainingBuckets);
609           bytesFreed += bucketGroup.free(bucketBytesToFree);
610         }
611         remainingBuckets--;
612       }
613 
614       /**
615        * Check whether need extra free because some bucketSizeinfo still needs
616        * free space
617        */
618       stats = bucketAllocator.getIndexStatistics();
619       boolean needFreeForExtra = false;
620       for (int i = 0; i < stats.length; i++) {
621         long freeGoal = (long) Math.floor(stats[i].totalCount()
622             * (1 - DEFAULT_MIN_FACTOR));
623         freeGoal = Math.max(freeGoal, 1);
624         if (stats[i].freeCount() < freeGoal) {
625           needFreeForExtra = true;
626           break;
627         }
628       }
629 
630       if (needFreeForExtra) {
631         bucketQueue.clear();
632         remainingBuckets = 2;
633 
634         bucketQueue.add(bucketSingle);
635         bucketQueue.add(bucketMulti);
636 
637         while ((bucketGroup = bucketQueue.poll()) != null) {
638           long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed)
639               / remainingBuckets;
640           bytesFreed += bucketGroup.free(bucketBytesToFree);
641           remainingBuckets--;
642         }
643       }
644 
645       if (LOG.isDebugEnabled()) {
646         long single = bucketSingle.totalSize();
647         long multi = bucketMulti.totalSize();
648         long memory = bucketMemory.totalSize();
649         LOG.debug("Bucket cache free space completed; " + "freed="
650             + StringUtils.byteDesc(bytesFreed) + ", " + "total="
651             + StringUtils.byteDesc(totalSize) + ", " + "single="
652             + StringUtils.byteDesc(single) + ", " + "multi="
653             + StringUtils.byteDesc(multi) + ", " + "memory="
654             + StringUtils.byteDesc(memory));
655       }
656 
657     } finally {
658       cacheStats.evict();
659       freeInProgress = false;
660       freeSpaceLock.unlock();
661     }
662   }
663 
664   // This handles flushing the RAM cache to IOEngine.
665   private class WriterThread extends HasThread {
666     BlockingQueue<RAMQueueEntry> inputQueue;
667     final int threadNO;
668     boolean writerEnabled = true;
669 
670     WriterThread(BlockingQueue<RAMQueueEntry> queue, int threadNO) {
671       super();
672       this.inputQueue = queue;
673       this.threadNO = threadNO;
674       setDaemon(true);
675     }
676     
677     // Used for test
678     void disableWriter() {
679       this.writerEnabled = false;
680     }
681 
682     public void run() {
683       List<RAMQueueEntry> entries = new ArrayList<RAMQueueEntry>();
684       try {
685         while (cacheEnabled && writerEnabled) {
686           try {
687             // Blocks
688             entries.add(inputQueue.take());
689             inputQueue.drainTo(entries);
690             synchronized (cacheWaitSignals[threadNO]) {
691               cacheWaitSignals[threadNO].notifyAll();
692             }
693           } catch (InterruptedException ie) {
694             if (!cacheEnabled) break;
695           }
696           doDrain(entries);
697         }
698       } catch (Throwable t) {
699         LOG.warn("Failed doing drain", t);
700       }
701       LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled);
702     }
703 
704     /**
705      * Flush the entries in ramCache to IOEngine and add bucket entry to
706      * backingMap
707      * @param entries
708      * @throws InterruptedException
709      */
710     private void doDrain(List<RAMQueueEntry> entries)
711         throws InterruptedException {
712       BucketEntry[] bucketEntries = new BucketEntry[entries.size()];
713       RAMQueueEntry[] ramEntries = new RAMQueueEntry[entries.size()];
714       int done = 0;
715       while (entries.size() > 0 && cacheEnabled) {
716         // Keep going in case we throw...
717         RAMQueueEntry ramEntry = null;
718         try {
719           ramEntry = entries.remove(entries.size() - 1);
720           if (ramEntry == null) {
721             LOG.warn("Couldn't get the entry from RAM queue, who steals it?");
722             continue;
723           }
724           BucketEntry bucketEntry = ramEntry.writeToCache(ioEngine,
725               bucketAllocator, deserialiserMap, realCacheSize);
726           ramEntries[done] = ramEntry;
727           bucketEntries[done++] = bucketEntry;
728           if (ioErrorStartTime > 0) {
729             ioErrorStartTime = -1;
730           }
731         } catch (BucketAllocatorException fle) {
732           LOG.warn("Failed allocating for block "
733               + (ramEntry == null ? "" : ramEntry.getKey()), fle);
734         } catch (CacheFullException cfe) {
735           if (!freeInProgress) {
736             freeSpace();
737           } else {
738             Thread.sleep(50);
739           }
740         } catch (IOException ioex) {
741           LOG.error("Failed writing to bucket cache", ioex);
742           checkIOErrorIsTolerated();
743         }
744       }
745 
746       // Make sure that the data pages we have written are on the media before
747       // we update the map.
748       try {
749         ioEngine.sync();
750       } catch (IOException ioex) {
751         LOG.error("Faild syncing IO engine", ioex);
752         checkIOErrorIsTolerated();
753         // Since we failed sync, free the blocks in bucket allocator
754         for (int i = 0; i < done; ++i) {
755           if (bucketEntries[i] != null) {
756             bucketAllocator.freeBlock(bucketEntries[i].offset());
757           }
758         }
759         done = 0;
760       }
761 
762       for (int i = 0; i < done; ++i) {
763         if (bucketEntries[i] != null) {
764           backingMap.put(ramEntries[i].getKey(), bucketEntries[i]);
765         }
766         RAMQueueEntry ramCacheEntry = ramCache.remove(ramEntries[i].getKey());
767         if (ramCacheEntry != null) {
768           heapSize.addAndGet(-1 * ramEntries[i].getData().heapSize());
769         }
770       }
771 
772       if (bucketAllocator.getUsedSize() > acceptableSize()) {
773         freeSpace();
774       }
775     }
776   }
777 
778   
779 
780   private void persistToFile() throws IOException {
781     assert !cacheEnabled;
782     FileOutputStream fos = null;
783     ObjectOutputStream oos = null;
784     try {
785       if (!ioEngine.isPersistent())
786         throw new IOException(
787             "Attempt to persist non-persistent cache mappings!");
788       fos = new FileOutputStream(persistencePath, false);
789       oos = new ObjectOutputStream(fos);
790       oos.writeLong(cacheCapacity);
791       oos.writeUTF(ioEngine.getClass().getName());
792       oos.writeUTF(backingMap.getClass().getName());
793       oos.writeObject(deserialiserMap);
794       oos.writeObject(backingMap);
795     } finally {
796       if (oos != null) oos.close();
797       if (fos != null) fos.close();
798     }
799   }
800 
801   @SuppressWarnings("unchecked")
802   private void retrieveFromFile() throws IOException, BucketAllocatorException,
803       ClassNotFoundException {
804     File persistenceFile = new File(persistencePath);
805     if (!persistenceFile.exists()) {
806       return;
807     }
808     assert !cacheEnabled;
809     FileInputStream fis = null;
810     ObjectInputStream ois = null;
811     try {
812       if (!ioEngine.isPersistent())
813         throw new IOException(
814             "Attempt to restore non-persistent cache mappings!");
815       fis = new FileInputStream(persistencePath);
816       ois = new ObjectInputStream(fis);
817       long capacitySize = ois.readLong();
818       if (capacitySize != cacheCapacity)
819         throw new IOException("Mismatched cache capacity:"
820             + StringUtils.byteDesc(capacitySize) + ", expected: "
821             + StringUtils.byteDesc(cacheCapacity));
822       String ioclass = ois.readUTF();
823       String mapclass = ois.readUTF();
824       if (!ioEngine.getClass().getName().equals(ioclass))
825         throw new IOException("Class name for IO engine mismatch: " + ioclass
826             + ", expected:" + ioEngine.getClass().getName());
827       if (!backingMap.getClass().getName().equals(mapclass))
828         throw new IOException("Class name for cache map mismatch: " + mapclass
829             + ", expected:" + backingMap.getClass().getName());
830       UniqueIndexMap<Integer> deserMap = (UniqueIndexMap<Integer>) ois
831           .readObject();
832       BucketAllocator allocator = new BucketAllocator(cacheCapacity, bucketSizes,
833           backingMap, realCacheSize);
834       backingMap = (ConcurrentHashMap<BlockCacheKey, BucketEntry>) ois
835           .readObject();
836       bucketAllocator = allocator;
837       deserialiserMap = deserMap;
838     } finally {
839       if (ois != null) ois.close();
840       if (fis != null) fis.close();
841       if (!persistenceFile.delete()) {
842         throw new IOException("Failed deleting persistence file "
843             + persistenceFile.getAbsolutePath());
844       }
845     }
846   }
847 
848   /**
849    * Check whether we tolerate IO error this time. If the duration of IOEngine
850    * throwing errors exceeds ioErrorsDurationTimeTolerated, we will disable the
851    * cache
852    */
853   private void checkIOErrorIsTolerated() {
854     long now = EnvironmentEdgeManager.currentTimeMillis();
855     if (this.ioErrorStartTime > 0) {
856       if (cacheEnabled
857           && (now - ioErrorStartTime) > this.ioErrorsTolerationDuration) {
858         LOG.error("IO errors duration time has exceeded "
859             + ioErrorsTolerationDuration
860             + "ms, disabing cache, please check your IOEngine");
861         disableCache();
862       }
863     } else {
864       this.ioErrorStartTime = now;
865     }
866   }
867 
868   /**
869    * Used to shut down the cache -or- turn it off in the case of something
870    * broken.
871    */
872   private void disableCache() {
873     if (!cacheEnabled)
874       return;
875     cacheEnabled = false;
876     ioEngine.shutdown();
877     this.scheduleThreadPool.shutdown();
878     for (int i = 0; i < writerThreads.length; ++i)
879       writerThreads[i].interrupt();
880     this.ramCache.clear();
881     if (!ioEngine.isPersistent() || persistencePath == null) {
882       this.backingMap.clear();
883     }
884   }
885 
886   private void join() throws InterruptedException {
887     for (int i = 0; i < writerThreads.length; ++i)
888       writerThreads[i].join();
889   }
890 
891   @Override
892   public void shutdown() {
893     disableCache();
894     LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent()
895         + "; path to write=" + persistencePath);
896     if (ioEngine.isPersistent() && persistencePath != null) {
897       try {
898         join();
899         persistToFile();
900       } catch (IOException ex) {
901         LOG.error("Unable to persist data on exit: " + ex.toString(), ex);
902       } catch (InterruptedException e) {
903         LOG.warn("Failed to persist data on exit", e);
904       }
905     }
906   }
907 
908   @Override
909   public CacheStats getStats() {
910     return cacheStats;
911   }
912 
913   public BucketAllocator getAllocator() {
914     return this.bucketAllocator;
915   }
916 
917   @Override
918   public long heapSize() {
919     return this.heapSize.get();
920   }
921 
922   @Override
923   public long size() {
924     return this.realCacheSize.get();
925   }
926 
927   @Override
928   public long getFreeSize() {
929     return this.bucketAllocator.getFreeSize();
930   }
931 
932   @Override
933   public long getBlockCount() {
934     return this.blockNumber.get();
935   }
936 
937   @Override
938   public long getCurrentSize() {
939     return this.bucketAllocator.getUsedSize();
940   }
941 
942   /**
943    * Evicts all blocks for a specific HFile.
944    * <p>
945    * This is used for evict-on-close to remove all blocks of a specific HFile.
946    *
947    * @return the number of blocks evicted
948    */
949   @Override
950   public int evictBlocksByHfileName(String hfileName) {
951     // Copy the list to avoid ConcurrentModificationException
952     // as evictBlockKey removes the key from the index
953     Set<BlockCacheKey> keySet = blocksByHFile.values(hfileName);
954     if (keySet == null) {
955       return 0;
956     }
957     int numEvicted = 0;
958     List<BlockCacheKey> keysForHFile = ImmutableList.copyOf(keySet);
959     for (BlockCacheKey key : keysForHFile) {
960       if (evictBlock(key)) {
961           ++numEvicted;
962       }
963     }
964     
965     return numEvicted;
966   }
967 
968   /**
969    * Item in cache. We expect this to be where most memory goes. Java uses 8
970    * bytes just for object headers; after this, we want to use as little as
971    * possible - so we only use 8 bytes, but in order to do so we end up messing
972    * around with all this Java casting stuff. Offset stored as 5 bytes that make
973    * up the long. Doubt we'll see devices this big for ages. Offsets are divided
974    * by 256. So 5 bytes gives us 256TB or so.
975    */
976   static class BucketEntry implements Serializable, Comparable<BucketEntry> {
977     private static final long serialVersionUID = -6741504807982257534L;
978     private int offsetBase;
979     private int length;
980     private byte offset1;
981     byte deserialiserIndex;
982     private volatile long accessTime;
983     private BlockPriority priority;
984     /**
985      * Time this block was cached.  Presumes we are created just before we are added to the cache.
986      */
987     private final long cachedTime = System.nanoTime();
988 
989     BucketEntry(long offset, int length, long accessTime, boolean inMemory) {
990       setOffset(offset);
991       this.length = length;
992       this.accessTime = accessTime;
993       if (inMemory) {
994         this.priority = BlockPriority.MEMORY;
995       } else {
996         this.priority = BlockPriority.SINGLE;
997       }
998     }
999 
1000     long offset() { // Java has no unsigned numbers
1001       long o = ((long) offsetBase) & 0xFFFFFFFF;
1002       o += (((long) (offset1)) & 0xFF) << 32;
1003       return o << 8;
1004     }
1005 
1006     private void setOffset(long value) {
1007       assert (value & 0xFF) == 0;
1008       value >>= 8;
1009       offsetBase = (int) value;
1010       offset1 = (byte) (value >> 32);
1011     }
1012 
1013     public int getLength() {
1014       return length;
1015     }
1016 
1017     protected CacheableDeserializer<Cacheable> deserializerReference(
1018         UniqueIndexMap<Integer> deserialiserMap) {
1019       return CacheableDeserializerIdManager.getDeserializer(deserialiserMap
1020           .unmap(deserialiserIndex));
1021     }
1022 
1023     protected void setDeserialiserReference(
1024         CacheableDeserializer<Cacheable> deserializer,
1025         UniqueIndexMap<Integer> deserialiserMap) {
1026       this.deserialiserIndex = ((byte) deserialiserMap.map(deserializer
1027           .getDeserialiserIdentifier()));
1028     }
1029 
1030     /**
1031      * Block has been accessed. Update its local access time.
1032      */
1033     public void access(long accessTime) {
1034       this.accessTime = accessTime;
1035       if (this.priority == BlockPriority.SINGLE) {
1036         this.priority = BlockPriority.MULTI;
1037       }
1038     }
1039     
1040     public BlockPriority getPriority() {
1041       return this.priority;
1042     }
1043 
1044     @Override
1045     public int compareTo(BucketEntry that) {
1046       if(this.accessTime == that.accessTime) return 0;
1047       return this.accessTime < that.accessTime ? 1 : -1;
1048     }
1049 
1050     @Override
1051     public boolean equals(Object that) {
1052       return this == that;
1053     }
1054 
1055     public long getCachedTime() {
1056       return cachedTime;
1057     }
1058   }
1059 
1060   /**
1061    * Used to group bucket entries into priority buckets. There will be a
1062    * BucketEntryGroup for each priority (single, multi, memory). Once bucketed,
1063    * the eviction algorithm takes the appropriate number of elements out of each
1064    * according to configuration parameters and their relative sizes.
1065    */
1066   private class BucketEntryGroup implements Comparable<BucketEntryGroup> {
1067 
1068     private CachedEntryQueue queue;
1069     private long totalSize = 0;
1070     private long bucketSize;
1071 
1072     public BucketEntryGroup(long bytesToFree, long blockSize, long bucketSize) {
1073       this.bucketSize = bucketSize;
1074       queue = new CachedEntryQueue(bytesToFree, blockSize);
1075       totalSize = 0;
1076     }
1077 
1078     public void add(Map.Entry<BlockCacheKey, BucketEntry> block) {
1079       totalSize += block.getValue().getLength();
1080       queue.add(block);
1081     }
1082 
1083     public long free(long toFree) {
1084       Map.Entry<BlockCacheKey, BucketEntry> entry;
1085       long freedBytes = 0;
1086       while ((entry = queue.pollLast()) != null) {
1087         evictBlock(entry.getKey());
1088         freedBytes += entry.getValue().getLength();
1089         if (freedBytes >= toFree) {
1090           return freedBytes;
1091         }
1092       }
1093       return freedBytes;
1094     }
1095 
1096     public long overflow() {
1097       return totalSize - bucketSize;
1098     }
1099 
1100     public long totalSize() {
1101       return totalSize;
1102     }
1103 
1104     @Override
1105     public int compareTo(BucketEntryGroup that) {
1106       if (this.overflow() == that.overflow())
1107         return 0;
1108       return this.overflow() > that.overflow() ? 1 : -1;
1109     }
1110 
1111     @Override
1112     public boolean equals(Object that) {
1113       return this == that;
1114     }
1115 
1116   }
1117 
1118   /**
1119    * Block Entry stored in the memory with key,data and so on
1120    */
1121   private static class RAMQueueEntry {
1122     private BlockCacheKey key;
1123     private Cacheable data;
1124     private long accessTime;
1125     private boolean inMemory;
1126 
1127     public RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessTime,
1128         boolean inMemory) {
1129       this.key = bck;
1130       this.data = data;
1131       this.accessTime = accessTime;
1132       this.inMemory = inMemory;
1133     }
1134 
1135     public Cacheable getData() {
1136       return data;
1137     }
1138 
1139     public BlockCacheKey getKey() {
1140       return key;
1141     }
1142 
1143     public void access(long accessTime) {
1144       this.accessTime = accessTime;
1145     }
1146 
1147     public BucketEntry writeToCache(final IOEngine ioEngine,
1148         final BucketAllocator bucketAllocator,
1149         final UniqueIndexMap<Integer> deserialiserMap,
1150         final AtomicLong realCacheSize) throws CacheFullException, IOException,
1151         BucketAllocatorException {
1152       int len = data.getSerializedLength();
1153       // This cacheable thing can't be serialized...
1154       if (len == 0) return null;
1155       long offset = bucketAllocator.allocateBlock(len);
1156       BucketEntry bucketEntry = new BucketEntry(offset, len, accessTime,
1157           inMemory);
1158       bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap);
1159       try {
1160         if (data instanceof HFileBlock) {
1161           ByteBuffer sliceBuf = ((HFileBlock) data).getBufferReadOnlyWithHeader();
1162           sliceBuf.rewind();
1163           assert len == sliceBuf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE;
1164           ByteBuffer extraInfoBuffer = ByteBuffer.allocate(HFileBlock.EXTRA_SERIALIZATION_SPACE);
1165           ((HFileBlock) data).serializeExtraInfo(extraInfoBuffer);
1166           ioEngine.write(sliceBuf, offset);
1167           ioEngine.write(extraInfoBuffer, offset + len - HFileBlock.EXTRA_SERIALIZATION_SPACE);
1168         } else {
1169           ByteBuffer bb = ByteBuffer.allocate(len);
1170           data.serialize(bb);
1171           ioEngine.write(bb, offset);
1172         }
1173       } catch (IOException ioe) {
1174         // free it in bucket allocator
1175         bucketAllocator.freeBlock(offset);
1176         throw ioe;
1177       }
1178       
1179       realCacheSize.addAndGet(len);
1180       return bucketEntry;
1181     }
1182   }
1183 
1184   /**
1185    * Only used in test
1186    * @throws InterruptedException
1187    */
1188   void stopWriterThreads() throws InterruptedException {
1189     for (WriterThread writerThread : writerThreads) {
1190       writerThread.disableWriter();
1191       writerThread.interrupt();
1192       writerThread.join();
1193     }
1194   }
1195 
1196   @Override
1197   public Iterator<CachedBlock> iterator() {
1198     // Don't bother with ramcache since stuff is in here only a little while.
1199     final Iterator<Map.Entry<BlockCacheKey, BucketEntry>> i =
1200         this.backingMap.entrySet().iterator();
1201     return new Iterator<CachedBlock>() {
1202       private final long now = System.nanoTime();
1203 
1204       @Override
1205       public boolean hasNext() {
1206         return i.hasNext();
1207       }
1208 
1209       @Override
1210       public CachedBlock next() {
1211         final Map.Entry<BlockCacheKey, BucketEntry> e = i.next();
1212         return new CachedBlock() {
1213           @Override
1214           public String toString() {
1215             return BlockCacheUtil.toString(this, now);
1216           }
1217 
1218           @Override
1219           public BlockPriority getBlockPriority() {
1220             return e.getValue().getPriority();
1221           }
1222 
1223           @Override
1224           public BlockType getBlockType() {
1225             // Not held by BucketEntry.  Could add it if wanted on BucketEntry creation.
1226             return null;
1227           }
1228 
1229           @Override
1230           public long getOffset() {
1231             return e.getKey().getOffset();
1232           }
1233 
1234           @Override
1235           public long getSize() {
1236             return e.getValue().getLength();
1237           }
1238 
1239           @Override
1240           public long getCachedTime() {
1241             return e.getValue().getCachedTime();
1242           }
1243 
1244           @Override
1245           public String getFilename() {
1246             return e.getKey().getHfileName();
1247           }
1248 
1249           @Override
1250           public int compareTo(CachedBlock other) {
1251             return (int)(this.getOffset() - other.getOffset());
1252           }
1253         };
1254       }
1255 
1256       @Override
1257       public void remove() {
1258         throw new UnsupportedOperationException();
1259       }
1260     };
1261   }
1262 
1263   @Override
1264   public BlockCache[] getBlockCaches() {
1265     return null;
1266   }
1267 }