View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   
16   * distributed under the License is distributed on an "AS IS" BASIS,
17   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18   * See the License for the specific language governing permissions and
19   * limitations under the License.
20   */
21  package org.apache.hadoop.hbase.io.hfile.bucket;
22  
23  import java.io.File;
24  import java.io.FileInputStream;
25  import java.io.FileNotFoundException;
26  import java.io.FileOutputStream;
27  import java.io.IOException;
28  import java.io.ObjectInputStream;
29  import java.io.ObjectOutputStream;
30  import java.io.Serializable;
31  import java.nio.ByteBuffer;
32  import java.util.ArrayList;
33  import java.util.Comparator;
34  import java.util.Iterator;
35  import java.util.List;
36  import java.util.Map;
37  import java.util.PriorityQueue;
38  import java.util.Set;
39  import java.util.concurrent.ArrayBlockingQueue;
40  import java.util.concurrent.BlockingQueue;
41  import java.util.concurrent.ConcurrentHashMap;
42  import java.util.concurrent.Executors;
43  import java.util.concurrent.ScheduledExecutorService;
44  import java.util.concurrent.TimeUnit;
45  import java.util.concurrent.atomic.AtomicLong;
46  import java.util.concurrent.locks.Lock;
47  import java.util.concurrent.locks.ReentrantLock;
48  
49  import org.apache.commons.logging.Log;
50  import org.apache.commons.logging.LogFactory;
51  import org.apache.hadoop.classification.InterfaceAudience;
52  import org.apache.hadoop.hbase.io.HeapSize;
53  import org.apache.hadoop.hbase.io.hfile.BlockCache;
54  import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
55  import org.apache.hadoop.hbase.io.hfile.BlockPriority;
56  import org.apache.hadoop.hbase.io.hfile.BlockType;
57  import org.apache.hadoop.hbase.io.hfile.CacheStats;
58  import org.apache.hadoop.hbase.io.hfile.Cacheable;
59  import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
60  import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
61  import org.apache.hadoop.hbase.io.hfile.CachedBlock;
62  import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil;
63  import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
64  import org.apache.hadoop.hbase.io.hfile.HFileBlock;
65  import org.apache.hadoop.hbase.util.ConcurrentIndex;
66  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
67  import org.apache.hadoop.hbase.util.HasThread;
68  import org.apache.hadoop.hbase.util.IdLock;
69  import org.apache.hadoop.util.StringUtils;
70  
71  import com.google.common.collect.ImmutableList;
72  import com.google.common.util.concurrent.ThreadFactoryBuilder;
73  
74  /**
75   * BucketCache uses {@link BucketAllocator} to allocate/free blocks, and uses
76   * {@link BucketCache#ramCache} and {@link BucketCache#backingMap} in order to
77   * determine if a given element is in the cache. The bucket cache can use on-heap or
78   * off-heap memory {@link ByteBufferIOEngine} or in a file {@link FileIOEngine} to
79   * store/read the block data.
80   * 
81   * <p>Eviction is via a similar algorithm as used in
82   * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache}
83   * 
84   * <p>BucketCache can be used as mainly a block cache (see
85   * {@link CombinedBlockCache}), combined with LruBlockCache to decrease CMS GC and
86   * heap fragmentation.
87   * 
88   * <p>It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store
89   * blocks) to enlarge cache space via
90   * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache#setVictimCache}
91   */
92  @InterfaceAudience.Private
93  public class BucketCache implements BlockCache, HeapSize {
94    static final Log LOG = LogFactory.getLog(BucketCache.class);
95  
96    /** Priority buckets */
97    private static final float DEFAULT_SINGLE_FACTOR = 0.25f;
98    private static final float DEFAULT_MULTI_FACTOR = 0.50f;
99    private static final float DEFAULT_MEMORY_FACTOR = 0.25f;
100   private static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f;
101 
102   private static final float DEFAULT_ACCEPT_FACTOR = 0.95f;
103   private static final float DEFAULT_MIN_FACTOR = 0.85f;
104 
105   /** Statistics thread */
106   private static final int statThreadPeriod = 5 * 60;
107 
108   final static int DEFAULT_WRITER_THREADS = 3;
109   final static int DEFAULT_WRITER_QUEUE_ITEMS = 64;
110 
111   // Store/read block data
112   IOEngine ioEngine;
113 
114   // Store the block in this map before writing it to cache
115   private Map<BlockCacheKey, RAMQueueEntry> ramCache;
116   // In this map, store the block's meta data like offset, length
117   private Map<BlockCacheKey, BucketEntry> backingMap;
118 
119   /**
120    * Flag if the cache is enabled or not... We shut it off if there are IO
121    * errors for some time, so that Bucket IO exceptions/errors don't bring down
122    * the HBase server.
123    */
124   private volatile boolean cacheEnabled;
125 
126   private ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues = 
127       new ArrayList<BlockingQueue<RAMQueueEntry>>();
128   WriterThread writerThreads[];
129 
130   /** Volatile boolean to track if free space is in process or not */
131   private volatile boolean freeInProgress = false;
132   private Lock freeSpaceLock = new ReentrantLock();
133 
134   private UniqueIndexMap<Integer> deserialiserMap = new UniqueIndexMap<Integer>();
135 
136   private final AtomicLong realCacheSize = new AtomicLong(0);
137   private final AtomicLong heapSize = new AtomicLong(0);
138   /** Current number of cached elements */
139   private final AtomicLong blockNumber = new AtomicLong(0);
140   private final AtomicLong failedBlockAdditions = new AtomicLong(0);
141 
142   /** Cache access count (sequential ID) */
143   private final AtomicLong accessCount = new AtomicLong(0);
144 
145   private final Object[] cacheWaitSignals;
146   private static final int DEFAULT_CACHE_WAIT_TIME = 50;
147   // Used in test now. If the flag is false and the cache speed is very fast,
148   // bucket cache will skip some blocks when caching. If the flag is true, we
149   // will wait blocks flushed to IOEngine for some time when caching
150   boolean wait_when_cache = false;
151 
152   private BucketCacheStats cacheStats = new BucketCacheStats();
153 
154   private String persistencePath;
155   private long cacheCapacity;
156   /** Approximate block size */
157   private final long blockSize;
158   private final int[] bucketSizes;
159 
160   /** Duration of IO errors tolerated before we disable cache, 1 min as default */
161   private final int ioErrorsTolerationDuration;
162   // 1 min
163   public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000;
164   // Start time of first IO error when reading or writing IO Engine, it will be
165   // reset after a successful read/write.
166   private volatile long ioErrorStartTime = -1;
167 
168   /**
169    * A "sparse lock" implementation allowing to lock on a particular block
170    * identified by offset. The purpose of this is to avoid freeing the block
171    * which is being read.
172    * 
173    * TODO:We could extend the IdLock to IdReadWriteLock for better.
174    */
175   private IdLock offsetLock = new IdLock();
176 
177   private final ConcurrentIndex<String, BlockCacheKey> blocksByHFile =
178       new ConcurrentIndex<String, BlockCacheKey>(new Comparator<BlockCacheKey>() {
179         @Override
180         public int compare(BlockCacheKey a, BlockCacheKey b) {
181           if (a.getOffset() == b.getOffset()) {
182             return 0;
183           } else if (a.getOffset() < b.getOffset()) {
184             return -1;
185           }
186           return 1;
187         }
188       });
189 
190   /** Statistics thread schedule pool (for heavy debugging, could remove) */
191   private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1,
192     new ThreadFactoryBuilder().setNameFormat("BucketCacheStatsExecutor").setDaemon(true).build());
193 
194   // Allocate or free space for the block
195   private BucketAllocator bucketAllocator;
196 
197   public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
198       int writerThreadNum, int writerQLen, String persistencePath) throws FileNotFoundException,
199       IOException {
200     this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
201       persistencePath, DEFAULT_ERROR_TOLERATION_DURATION);
202   }
203   
204   public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
205       int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration)
206       throws FileNotFoundException, IOException {
207     this.ioEngine = getIOEngineFromName(ioEngineName, capacity);
208     this.writerThreads = new WriterThread[writerThreadNum];
209     this.cacheWaitSignals = new Object[writerThreadNum];
210     long blockNumCapacity = capacity / blockSize;
211     if (blockNumCapacity >= Integer.MAX_VALUE) {
212       // Enough for about 32TB of cache!
213       throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now");
214     }
215 
216     this.cacheCapacity = capacity;
217     this.persistencePath = persistencePath;
218     this.blockSize = blockSize;
219     this.bucketSizes = bucketSizes;
220     this.ioErrorsTolerationDuration = ioErrorsTolerationDuration;
221 
222     bucketAllocator = new BucketAllocator(capacity, bucketSizes);
223     for (int i = 0; i < writerThreads.length; ++i) {
224       writerQueues.add(new ArrayBlockingQueue<RAMQueueEntry>(writerQLen));
225       this.cacheWaitSignals[i] = new Object();
226     }
227 
228     assert writerQueues.size() == writerThreads.length;
229     this.ramCache = new ConcurrentHashMap<BlockCacheKey, RAMQueueEntry>();
230 
231     this.backingMap = new ConcurrentHashMap<BlockCacheKey, BucketEntry>((int) blockNumCapacity);
232 
233     if (ioEngine.isPersistent() && persistencePath != null) {
234       try {
235         retrieveFromFile();
236       } catch (IOException ioex) {
237         LOG.error("Can't restore from file because of", ioex);
238       } catch (ClassNotFoundException cnfe) {
239         LOG.error("Can't restore from file in rebuild because can't deserialise",cnfe);
240         throw new RuntimeException(cnfe);
241       }
242     }
243     final String threadName = Thread.currentThread().getName();
244     this.cacheEnabled = true;
245     for (int i = 0; i < writerThreads.length; ++i) {
246       writerThreads[i] = new WriterThread(writerQueues.get(i), i);
247       writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i);
248       writerThreads[i].start();
249     }
250     // Run the statistics thread periodically to print the cache statistics log
251     // TODO: Add means of turning this off.  Bit obnoxious running thread just to make a log
252     // every five minutes.
253     this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
254         statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
255     LOG.info("Started bucket cache; ioengine=" + ioEngineName +
256         ", capacity=" + StringUtils.byteDesc(capacity) +
257       ", blockSize=" + StringUtils.byteDesc(blockSize) + ", writerThreadNum=" +
258         writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath=" +
259       persistencePath + ", bucketAllocator=" + this.bucketAllocator);
260   }
261 
262   public long getMaxSize() {
263     return this.cacheCapacity;
264   }
265 
266   public String getIoEngine() {
267     return ioEngine.toString();
268   }
269 
270   /**
271    * Get the IOEngine from the IO engine name
272    * @param ioEngineName
273    * @param capacity
274    * @return the IOEngine
275    * @throws IOException
276    */
277   private IOEngine getIOEngineFromName(String ioEngineName, long capacity)
278       throws IOException {
279     if (ioEngineName.startsWith("file:"))
280       return new FileIOEngine(ioEngineName.substring(5), capacity);
281     else if (ioEngineName.startsWith("offheap"))
282       return new ByteBufferIOEngine(capacity, true);
283     else if (ioEngineName.startsWith("heap"))
284       return new ByteBufferIOEngine(capacity, false);
285     else
286       throw new IllegalArgumentException(
287           "Don't understand io engine name for cache - prefix with file:, heap or offheap");
288   }
289 
290   /**
291    * Cache the block with the specified name and buffer.
292    * @param cacheKey block's cache key
293    * @param buf block buffer
294    */
295   @Override
296   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
297     cacheBlock(cacheKey, buf, false, false);
298   }
299 
300   /**
301    * Cache the block with the specified name and buffer.
302    * @param cacheKey block's cache key
303    * @param cachedItem block buffer
304    * @param inMemory if block is in-memory
305    * @param cacheDataInL1
306    */
307   @Override
308   public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
309       final boolean cacheDataInL1) {
310     cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache);
311   }
312 
313   /**
314    * Cache the block to ramCache
315    * @param cacheKey block's cache key
316    * @param cachedItem block buffer
317    * @param inMemory if block is in-memory
318    * @param wait if true, blocking wait when queue is full
319    */
320   public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem,
321       boolean inMemory, boolean wait) {
322     if (!cacheEnabled)
323       return;
324 
325     if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey))
326       return;
327 
328     /*
329      * Stuff the entry into the RAM cache so it can get drained to the
330      * persistent store
331      */
332     RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem,
333         accessCount.incrementAndGet(), inMemory);
334     ramCache.put(cacheKey, re);
335     int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size();
336     BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum);
337     boolean successfulAddition = bq.offer(re);
338     if (!successfulAddition && wait) {
339       synchronized (cacheWaitSignals[queueNum]) {
340         try {
341           successfulAddition = bq.offer(re);
342           if (!successfulAddition) cacheWaitSignals[queueNum].wait(DEFAULT_CACHE_WAIT_TIME);
343         } catch (InterruptedException ie) {
344           Thread.currentThread().interrupt();
345         }
346       }
347       successfulAddition = bq.offer(re);
348     }
349     if (!successfulAddition) {
350         ramCache.remove(cacheKey);
351         failedBlockAdditions.incrementAndGet();
352     } else {
353       this.blockNumber.incrementAndGet();
354       this.heapSize.addAndGet(cachedItem.heapSize());
355       blocksByHFile.put(cacheKey.getHfileName(), cacheKey);
356     }
357   }
358 
359   /**
360    * Get the buffer of the block with the specified key.
361    * @param key block's cache key
362    * @param caching true if the caller caches blocks on cache misses
363    * @param repeat Whether this is a repeat lookup for the same block
364    * @param updateCacheMetrics Whether we should update cache metrics or not
365    * @return buffer of specified cache key, or null if not in cache
366    */
367   @Override
368   public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
369       boolean updateCacheMetrics) {
370     if (!cacheEnabled)
371       return null;
372     RAMQueueEntry re = ramCache.get(key);
373     if (re != null) {
374       if (updateCacheMetrics) cacheStats.hit(caching);
375       re.access(accessCount.incrementAndGet());
376       return re.getData();
377     }
378     BucketEntry bucketEntry = backingMap.get(key);
379     if (bucketEntry != null) {
380       long start = System.nanoTime();
381       IdLock.Entry lockEntry = null;
382       try {
383         lockEntry = offsetLock.getLockEntry(bucketEntry.offset());
384         if (bucketEntry.equals(backingMap.get(key))) {
385           int len = bucketEntry.getLength();
386           ByteBuffer bb = ByteBuffer.allocate(len);
387           int lenRead = ioEngine.read(bb, bucketEntry.offset());
388           if (lenRead != len) {
389             throw new RuntimeException("Only " + lenRead + " bytes read, " + len + " expected");
390           }
391           CacheableDeserializer<Cacheable> deserializer =
392             bucketEntry.deserializerReference(this.deserialiserMap);
393           Cacheable cachedBlock = deserializer.deserialize(bb, true);
394           long timeTaken = System.nanoTime() - start;
395           if (updateCacheMetrics) {
396             cacheStats.hit(caching);
397             cacheStats.ioHit(timeTaken);
398           }
399           bucketEntry.access(accessCount.incrementAndGet());
400           if (this.ioErrorStartTime > 0) {
401             ioErrorStartTime = -1;
402           }
403           return cachedBlock;
404         }
405       } catch (IOException ioex) {
406         LOG.error("Failed reading block " + key + " from bucket cache", ioex);
407         checkIOErrorIsTolerated();
408       } finally {
409         if (lockEntry != null) {
410           offsetLock.releaseLockEntry(lockEntry);
411         }
412       }
413     }
414     if (!repeat && updateCacheMetrics) cacheStats.miss(caching);
415     return null;
416   }
417 
418   @Override
419   public boolean evictBlock(BlockCacheKey cacheKey) {
420     if (!cacheEnabled) return false;
421     RAMQueueEntry removedBlock = ramCache.remove(cacheKey);
422     if (removedBlock != null) {
423       this.blockNumber.decrementAndGet();
424       this.heapSize.addAndGet(-1 * removedBlock.getData().heapSize());
425     }
426     BucketEntry bucketEntry = backingMap.get(cacheKey);
427     if (bucketEntry != null) {
428       IdLock.Entry lockEntry = null;
429       try {
430         lockEntry = offsetLock.getLockEntry(bucketEntry.offset());
431         if (bucketEntry.equals(backingMap.remove(cacheKey))) {
432           bucketAllocator.freeBlock(bucketEntry.offset());
433           realCacheSize.addAndGet(-1 * bucketEntry.getLength());
434           blocksByHFile.remove(cacheKey.getHfileName(), cacheKey);
435           if (removedBlock == null) {
436             this.blockNumber.decrementAndGet();
437           }
438         } else {
439           return false;
440         }
441       } catch (IOException ie) {
442         LOG.warn("Failed evicting block " + cacheKey);
443         return false;
444       } finally {
445         if (lockEntry != null) {
446           offsetLock.releaseLockEntry(lockEntry);
447         }
448       }
449     }
450     cacheStats.evicted(bucketEntry == null? 0: bucketEntry.getCachedTime());
451     return true;
452   }
453 
454   /*
455    * Statistics thread.  Periodically output cache statistics to the log.
456    */
457   private static class StatisticsThread extends Thread {
458     private final BucketCache bucketCache;
459 
460     public StatisticsThread(BucketCache bucketCache) {
461       super("BucketCacheStatsThread");
462       setDaemon(true);
463       this.bucketCache = bucketCache;
464     }
465 
466     @Override
467     public void run() {
468       bucketCache.logStats();
469     }
470   }
471 
472   public void logStats() {
473     long totalSize = bucketAllocator.getTotalSize();
474     long usedSize = bucketAllocator.getUsedSize();
475     long freeSize = totalSize - usedSize;
476     long cacheSize = getRealCacheSize();
477     LOG.info("failedBlockAdditions=" + getFailedBlockAdditions() + ", " +
478         "totalSize=" + StringUtils.byteDesc(totalSize) + ", " +
479         "freeSize=" + StringUtils.byteDesc(freeSize) + ", " +
480         "usedSize=" + StringUtils.byteDesc(usedSize) +", " +
481         "cacheSize=" + StringUtils.byteDesc(cacheSize) +", " +
482         "accesses=" + cacheStats.getRequestCount() + ", " +
483         "hits=" + cacheStats.getHitCount() + ", " +
484         "IOhitsPerSecond=" + cacheStats.getIOHitsPerSecond() + ", " +
485         "IOTimePerHit=" + String.format("%.2f", cacheStats.getIOTimePerHit())+ ", " +
486         "hitRatio=" + (cacheStats.getHitCount() == 0 ? "0," : 
487           (StringUtils.formatPercent(cacheStats.getHitRatio(), 2)+ ", ")) +
488         "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " +
489         "cachingHits=" + cacheStats.getHitCachingCount() + ", " +
490         "cachingHitsRatio=" +(cacheStats.getHitCachingCount() == 0 ? "0," : 
491           (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2)+ ", ")) +
492         "evictions=" + cacheStats.getEvictionCount() + ", " +
493         "evicted=" + cacheStats.getEvictedCount() + ", " +
494         "evictedPerRun=" + cacheStats.evictedPerEviction());
495     cacheStats.reset();
496   }
497 
498   public long getFailedBlockAdditions() {
499     return this.failedBlockAdditions.get();
500   }
501 
502   public long getRealCacheSize() {
503     return this.realCacheSize.get();
504   }
505 
506   private long acceptableSize() {
507     return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_ACCEPT_FACTOR);
508   }
509 
510   private long singleSize() {
511     return (long) Math.floor(bucketAllocator.getTotalSize()
512         * DEFAULT_SINGLE_FACTOR * DEFAULT_MIN_FACTOR);
513   }
514 
515   private long multiSize() {
516     return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_MULTI_FACTOR
517         * DEFAULT_MIN_FACTOR);
518   }
519 
520   private long memorySize() {
521     return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_MEMORY_FACTOR
522         * DEFAULT_MIN_FACTOR);
523   }
524 
525   /**
526    * Free the space if the used size reaches acceptableSize() or one size block
527    * couldn't be allocated. When freeing the space, we use the LRU algorithm and
528    * ensure there must be some blocks evicted
529    */
530   private void freeSpace() {
531     // Ensure only one freeSpace progress at a time
532     if (!freeSpaceLock.tryLock()) return;
533     try {
534       freeInProgress = true;
535       long bytesToFreeWithoutExtra = 0;
536       /*
537        * Calculate free byte for each bucketSizeinfo
538        */
539       StringBuffer msgBuffer = new StringBuffer();
540       BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics();
541       long[] bytesToFreeForBucket = new long[stats.length];
542       for (int i = 0; i < stats.length; i++) {
543         bytesToFreeForBucket[i] = 0;
544         long freeGoal = (long) Math.floor(stats[i].totalCount()
545             * (1 - DEFAULT_MIN_FACTOR));
546         freeGoal = Math.max(freeGoal, 1);
547         if (stats[i].freeCount() < freeGoal) {
548           bytesToFreeForBucket[i] = stats[i].itemSize()
549           * (freeGoal - stats[i].freeCount());
550           bytesToFreeWithoutExtra += bytesToFreeForBucket[i];
551           msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")="
552               + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", ");
553         }
554       }
555       msgBuffer.append("Free for total="
556           + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", ");
557 
558       if (bytesToFreeWithoutExtra <= 0) {
559         return;
560       }
561       long currentSize = bucketAllocator.getUsedSize();
562       long totalSize=bucketAllocator.getTotalSize();
563       LOG.debug("Bucket cache free space started; Attempting to  " + msgBuffer.toString()
564           + " of current used=" + StringUtils.byteDesc(currentSize)
565           + ",actual cacheSize=" + StringUtils.byteDesc(realCacheSize.get())
566           + ",total=" + StringUtils.byteDesc(totalSize));
567       
568       long bytesToFreeWithExtra = (long) Math.floor(bytesToFreeWithoutExtra
569           * (1 + DEFAULT_EXTRA_FREE_FACTOR));
570 
571       // Instantiate priority buckets
572       BucketEntryGroup bucketSingle = new BucketEntryGroup(bytesToFreeWithExtra,
573           blockSize, singleSize());
574       BucketEntryGroup bucketMulti = new BucketEntryGroup(bytesToFreeWithExtra,
575           blockSize, multiSize());
576       BucketEntryGroup bucketMemory = new BucketEntryGroup(bytesToFreeWithExtra,
577           blockSize, memorySize());
578 
579       // Scan entire map putting bucket entry into appropriate bucket entry
580       // group
581       for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) {
582         switch (bucketEntryWithKey.getValue().getPriority()) {
583           case SINGLE: {
584             bucketSingle.add(bucketEntryWithKey);
585             break;
586           }
587           case MULTI: {
588             bucketMulti.add(bucketEntryWithKey);
589             break;
590           }
591           case MEMORY: {
592             bucketMemory.add(bucketEntryWithKey);
593             break;
594           }
595         }
596       }
597 
598       PriorityQueue<BucketEntryGroup> bucketQueue = new PriorityQueue<BucketEntryGroup>(3);
599 
600       bucketQueue.add(bucketSingle);
601       bucketQueue.add(bucketMulti);
602       bucketQueue.add(bucketMemory);
603 
604       int remainingBuckets = 3;
605       long bytesFreed = 0;
606 
607       BucketEntryGroup bucketGroup;
608       while ((bucketGroup = bucketQueue.poll()) != null) {
609         long overflow = bucketGroup.overflow();
610         if (overflow > 0) {
611           long bucketBytesToFree = Math.min(overflow,
612               (bytesToFreeWithoutExtra - bytesFreed) / remainingBuckets);
613           bytesFreed += bucketGroup.free(bucketBytesToFree);
614         }
615         remainingBuckets--;
616       }
617 
618       /**
619        * Check whether need extra free because some bucketSizeinfo still needs
620        * free space
621        */
622       stats = bucketAllocator.getIndexStatistics();
623       boolean needFreeForExtra = false;
624       for (int i = 0; i < stats.length; i++) {
625         long freeGoal = (long) Math.floor(stats[i].totalCount()
626             * (1 - DEFAULT_MIN_FACTOR));
627         freeGoal = Math.max(freeGoal, 1);
628         if (stats[i].freeCount() < freeGoal) {
629           needFreeForExtra = true;
630           break;
631         }
632       }
633 
634       if (needFreeForExtra) {
635         bucketQueue.clear();
636         remainingBuckets = 2;
637 
638         bucketQueue.add(bucketSingle);
639         bucketQueue.add(bucketMulti);
640 
641         while ((bucketGroup = bucketQueue.poll()) != null) {
642           long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed)
643               / remainingBuckets;
644           bytesFreed += bucketGroup.free(bucketBytesToFree);
645           remainingBuckets--;
646         }
647       }
648 
649       if (LOG.isDebugEnabled()) {
650         long single = bucketSingle.totalSize();
651         long multi = bucketMulti.totalSize();
652         long memory = bucketMemory.totalSize();
653         LOG.debug("Bucket cache free space completed; " + "freed="
654             + StringUtils.byteDesc(bytesFreed) + ", " + "total="
655             + StringUtils.byteDesc(totalSize) + ", " + "single="
656             + StringUtils.byteDesc(single) + ", " + "multi="
657             + StringUtils.byteDesc(multi) + ", " + "memory="
658             + StringUtils.byteDesc(memory));
659       }
660 
661     } finally {
662       cacheStats.evict();
663       freeInProgress = false;
664       freeSpaceLock.unlock();
665     }
666   }
667 
668   // This handles flushing the RAM cache to IOEngine.
669   private class WriterThread extends HasThread {
670     BlockingQueue<RAMQueueEntry> inputQueue;
671     final int threadNO;
672     boolean writerEnabled = true;
673 
674     WriterThread(BlockingQueue<RAMQueueEntry> queue, int threadNO) {
675       super();
676       this.inputQueue = queue;
677       this.threadNO = threadNO;
678       setDaemon(true);
679     }
680     
681     // Used for test
682     void disableWriter() {
683       this.writerEnabled = false;
684     }
685 
686     public void run() {
687       List<RAMQueueEntry> entries = new ArrayList<RAMQueueEntry>();
688       try {
689         while (cacheEnabled && writerEnabled) {
690           try {
691             try {
692               // Blocks
693               entries.add(inputQueue.take());
694               inputQueue.drainTo(entries);
695               synchronized (cacheWaitSignals[threadNO]) {
696                 cacheWaitSignals[threadNO].notifyAll();
697               }
698             } catch (InterruptedException ie) {
699               if (!cacheEnabled) break;
700             }
701             doDrain(entries);
702           } catch (Exception ioe) {
703             LOG.error("WriterThread encountered error", ioe);
704           }
705         }
706       } catch (Throwable t) {
707         LOG.warn("Failed doing drain", t);
708       }
709       LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled);
710     }
711 
712     /**
713      * Flush the entries in ramCache to IOEngine and add bucket entry to
714      * backingMap
715      * @param entries
716      * @throws InterruptedException
717      */
718     private void doDrain(List<RAMQueueEntry> entries)
719         throws InterruptedException {
720       BucketEntry[] bucketEntries = new BucketEntry[entries.size()];
721       RAMQueueEntry[] ramEntries = new RAMQueueEntry[entries.size()];
722       int done = 0;
723       while (entries.size() > 0 && cacheEnabled) {
724         // Keep going in case we throw...
725         RAMQueueEntry ramEntry = null;
726         try {
727           ramEntry = entries.remove(entries.size() - 1);
728           if (ramEntry == null) {
729             LOG.warn("Couldn't get the entry from RAM queue, who steals it?");
730             continue;
731           }
732           BucketEntry bucketEntry = ramEntry.writeToCache(ioEngine,
733               bucketAllocator, deserialiserMap, realCacheSize);
734           ramEntries[done] = ramEntry;
735           bucketEntries[done++] = bucketEntry;
736           if (ioErrorStartTime > 0) {
737             ioErrorStartTime = -1;
738           }
739         } catch (BucketAllocatorException fle) {
740           LOG.warn("Failed allocating for block "
741               + (ramEntry == null ? "" : ramEntry.getKey()), fle);
742         } catch (CacheFullException cfe) {
743           if (!freeInProgress) {
744             freeSpace();
745           } else {
746             Thread.sleep(50);
747           }
748         } catch (IOException ioex) {
749           LOG.error("Failed writing to bucket cache", ioex);
750           checkIOErrorIsTolerated();
751         }
752       }
753 
754       // Make sure that the data pages we have written are on the media before
755       // we update the map.
756       try {
757         ioEngine.sync();
758       } catch (IOException ioex) {
759         LOG.error("Faild syncing IO engine", ioex);
760         checkIOErrorIsTolerated();
761         // Since we failed sync, free the blocks in bucket allocator
762         for (int i = 0; i < done; ++i) {
763           if (bucketEntries[i] != null) {
764             bucketAllocator.freeBlock(bucketEntries[i].offset());
765           }
766         }
767         done = 0;
768       }
769 
770       for (int i = 0; i < done; ++i) {
771         if (bucketEntries[i] != null) {
772           backingMap.put(ramEntries[i].getKey(), bucketEntries[i]);
773         }
774         RAMQueueEntry ramCacheEntry = ramCache.remove(ramEntries[i].getKey());
775         if (ramCacheEntry != null) {
776           heapSize.addAndGet(-1 * ramEntries[i].getData().heapSize());
777         }
778       }
779 
780       if (bucketAllocator.getUsedSize() > acceptableSize()) {
781         freeSpace();
782       }
783     }
784   }
785 
786   
787 
788   private void persistToFile() throws IOException {
789     assert !cacheEnabled;
790     FileOutputStream fos = null;
791     ObjectOutputStream oos = null;
792     try {
793       if (!ioEngine.isPersistent())
794         throw new IOException(
795             "Attempt to persist non-persistent cache mappings!");
796       fos = new FileOutputStream(persistencePath, false);
797       oos = new ObjectOutputStream(fos);
798       oos.writeLong(cacheCapacity);
799       oos.writeUTF(ioEngine.getClass().getName());
800       oos.writeUTF(backingMap.getClass().getName());
801       oos.writeObject(deserialiserMap);
802       oos.writeObject(backingMap);
803     } finally {
804       if (oos != null) oos.close();
805       if (fos != null) fos.close();
806     }
807   }
808 
809   @SuppressWarnings("unchecked")
810   private void retrieveFromFile() throws IOException, BucketAllocatorException,
811       ClassNotFoundException {
812     File persistenceFile = new File(persistencePath);
813     if (!persistenceFile.exists()) {
814       return;
815     }
816     assert !cacheEnabled;
817     FileInputStream fis = null;
818     ObjectInputStream ois = null;
819     try {
820       if (!ioEngine.isPersistent())
821         throw new IOException(
822             "Attempt to restore non-persistent cache mappings!");
823       fis = new FileInputStream(persistencePath);
824       ois = new ObjectInputStream(fis);
825       long capacitySize = ois.readLong();
826       if (capacitySize != cacheCapacity)
827         throw new IOException("Mismatched cache capacity:"
828             + StringUtils.byteDesc(capacitySize) + ", expected: "
829             + StringUtils.byteDesc(cacheCapacity));
830       String ioclass = ois.readUTF();
831       String mapclass = ois.readUTF();
832       if (!ioEngine.getClass().getName().equals(ioclass))
833         throw new IOException("Class name for IO engine mismatch: " + ioclass
834             + ", expected:" + ioEngine.getClass().getName());
835       if (!backingMap.getClass().getName().equals(mapclass))
836         throw new IOException("Class name for cache map mismatch: " + mapclass
837             + ", expected:" + backingMap.getClass().getName());
838       UniqueIndexMap<Integer> deserMap = (UniqueIndexMap<Integer>) ois
839           .readObject();
840       BucketAllocator allocator = new BucketAllocator(cacheCapacity, bucketSizes,
841           backingMap, realCacheSize);
842       backingMap = (ConcurrentHashMap<BlockCacheKey, BucketEntry>) ois
843           .readObject();
844       bucketAllocator = allocator;
845       deserialiserMap = deserMap;
846     } finally {
847       if (ois != null) ois.close();
848       if (fis != null) fis.close();
849       if (!persistenceFile.delete()) {
850         throw new IOException("Failed deleting persistence file "
851             + persistenceFile.getAbsolutePath());
852       }
853     }
854   }
855 
856   /**
857    * Check whether we tolerate IO error this time. If the duration of IOEngine
858    * throwing errors exceeds ioErrorsDurationTimeTolerated, we will disable the
859    * cache
860    */
861   private void checkIOErrorIsTolerated() {
862     long now = EnvironmentEdgeManager.currentTimeMillis();
863     if (this.ioErrorStartTime > 0) {
864       if (cacheEnabled
865           && (now - ioErrorStartTime) > this.ioErrorsTolerationDuration) {
866         LOG.error("IO errors duration time has exceeded "
867             + ioErrorsTolerationDuration
868             + "ms, disabing cache, please check your IOEngine");
869         disableCache();
870       }
871     } else {
872       this.ioErrorStartTime = now;
873     }
874   }
875 
876   /**
877    * Used to shut down the cache -or- turn it off in the case of something
878    * broken.
879    */
880   private void disableCache() {
881     if (!cacheEnabled)
882       return;
883     cacheEnabled = false;
884     ioEngine.shutdown();
885     this.scheduleThreadPool.shutdown();
886     for (int i = 0; i < writerThreads.length; ++i)
887       writerThreads[i].interrupt();
888     this.ramCache.clear();
889     if (!ioEngine.isPersistent() || persistencePath == null) {
890       this.backingMap.clear();
891     }
892   }
893 
894   private void join() throws InterruptedException {
895     for (int i = 0; i < writerThreads.length; ++i)
896       writerThreads[i].join();
897   }
898 
899   @Override
900   public void shutdown() {
901     disableCache();
902     LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent()
903         + "; path to write=" + persistencePath);
904     if (ioEngine.isPersistent() && persistencePath != null) {
905       try {
906         join();
907         persistToFile();
908       } catch (IOException ex) {
909         LOG.error("Unable to persist data on exit: " + ex.toString(), ex);
910       } catch (InterruptedException e) {
911         LOG.warn("Failed to persist data on exit", e);
912       }
913     }
914   }
915 
916   @Override
917   public CacheStats getStats() {
918     return cacheStats;
919   }
920 
921   public BucketAllocator getAllocator() {
922     return this.bucketAllocator;
923   }
924 
925   @Override
926   public long heapSize() {
927     return this.heapSize.get();
928   }
929 
930   @Override
931   public long size() {
932     return this.realCacheSize.get();
933   }
934 
935   @Override
936   public long getFreeSize() {
937     return this.bucketAllocator.getFreeSize();
938   }
939 
940   @Override
941   public long getBlockCount() {
942     return this.blockNumber.get();
943   }
944 
945   @Override
946   public long getCurrentSize() {
947     return this.bucketAllocator.getUsedSize();
948   }
949 
950   /**
951    * Evicts all blocks for a specific HFile.
952    * <p>
953    * This is used for evict-on-close to remove all blocks of a specific HFile.
954    *
955    * @return the number of blocks evicted
956    */
957   @Override
958   public int evictBlocksByHfileName(String hfileName) {
959     // Copy the list to avoid ConcurrentModificationException
960     // as evictBlockKey removes the key from the index
961     Set<BlockCacheKey> keySet = blocksByHFile.values(hfileName);
962     if (keySet == null) {
963       return 0;
964     }
965     int numEvicted = 0;
966     List<BlockCacheKey> keysForHFile = ImmutableList.copyOf(keySet);
967     for (BlockCacheKey key : keysForHFile) {
968       if (evictBlock(key)) {
969           ++numEvicted;
970       }
971     }
972     
973     return numEvicted;
974   }
975 
976   /**
977    * Item in cache. We expect this to be where most memory goes. Java uses 8
978    * bytes just for object headers; after this, we want to use as little as
979    * possible - so we only use 8 bytes, but in order to do so we end up messing
980    * around with all this Java casting stuff. Offset stored as 5 bytes that make
981    * up the long. Doubt we'll see devices this big for ages. Offsets are divided
982    * by 256. So 5 bytes gives us 256TB or so.
983    */
984   static class BucketEntry implements Serializable, Comparable<BucketEntry> {
985     private static final long serialVersionUID = -6741504807982257534L;
986     private int offsetBase;
987     private int length;
988     private byte offset1;
989     byte deserialiserIndex;
990     private volatile long accessTime;
991     private BlockPriority priority;
992     /**
993      * Time this block was cached.  Presumes we are created just before we are added to the cache.
994      */
995     private final long cachedTime = System.nanoTime();
996 
997     BucketEntry(long offset, int length, long accessTime, boolean inMemory) {
998       setOffset(offset);
999       this.length = length;
1000       this.accessTime = accessTime;
1001       if (inMemory) {
1002         this.priority = BlockPriority.MEMORY;
1003       } else {
1004         this.priority = BlockPriority.SINGLE;
1005       }
1006     }
1007 
1008     long offset() { // Java has no unsigned numbers
1009       long o = ((long) offsetBase) & 0xFFFFFFFF;
1010       o += (((long) (offset1)) & 0xFF) << 32;
1011       return o << 8;
1012     }
1013 
1014     private void setOffset(long value) {
1015       assert (value & 0xFF) == 0;
1016       value >>= 8;
1017       offsetBase = (int) value;
1018       offset1 = (byte) (value >> 32);
1019     }
1020 
1021     public int getLength() {
1022       return length;
1023     }
1024 
1025     protected CacheableDeserializer<Cacheable> deserializerReference(
1026         UniqueIndexMap<Integer> deserialiserMap) {
1027       return CacheableDeserializerIdManager.getDeserializer(deserialiserMap
1028           .unmap(deserialiserIndex));
1029     }
1030 
1031     protected void setDeserialiserReference(
1032         CacheableDeserializer<Cacheable> deserializer,
1033         UniqueIndexMap<Integer> deserialiserMap) {
1034       this.deserialiserIndex = ((byte) deserialiserMap.map(deserializer
1035           .getDeserialiserIdentifier()));
1036     }
1037 
1038     /**
1039      * Block has been accessed. Update its local access time.
1040      */
1041     public void access(long accessTime) {
1042       this.accessTime = accessTime;
1043       if (this.priority == BlockPriority.SINGLE) {
1044         this.priority = BlockPriority.MULTI;
1045       }
1046     }
1047     
1048     public BlockPriority getPriority() {
1049       return this.priority;
1050     }
1051 
1052     @Override
1053     public int compareTo(BucketEntry that) {
1054       if(this.accessTime == that.accessTime) return 0;
1055       return this.accessTime < that.accessTime ? 1 : -1;
1056     }
1057 
1058     @Override
1059     public boolean equals(Object that) {
1060       return this == that;
1061     }
1062 
1063     public long getCachedTime() {
1064       return cachedTime;
1065     }
1066   }
1067 
1068   /**
1069    * Used to group bucket entries into priority buckets. There will be a
1070    * BucketEntryGroup for each priority (single, multi, memory). Once bucketed,
1071    * the eviction algorithm takes the appropriate number of elements out of each
1072    * according to configuration parameters and their relative sizes.
1073    */
1074   private class BucketEntryGroup implements Comparable<BucketEntryGroup> {
1075 
1076     private CachedEntryQueue queue;
1077     private long totalSize = 0;
1078     private long bucketSize;
1079 
1080     public BucketEntryGroup(long bytesToFree, long blockSize, long bucketSize) {
1081       this.bucketSize = bucketSize;
1082       queue = new CachedEntryQueue(bytesToFree, blockSize);
1083       totalSize = 0;
1084     }
1085 
1086     public void add(Map.Entry<BlockCacheKey, BucketEntry> block) {
1087       totalSize += block.getValue().getLength();
1088       queue.add(block);
1089     }
1090 
1091     public long free(long toFree) {
1092       Map.Entry<BlockCacheKey, BucketEntry> entry;
1093       long freedBytes = 0;
1094       while ((entry = queue.pollLast()) != null) {
1095         evictBlock(entry.getKey());
1096         freedBytes += entry.getValue().getLength();
1097         if (freedBytes >= toFree) {
1098           return freedBytes;
1099         }
1100       }
1101       return freedBytes;
1102     }
1103 
1104     public long overflow() {
1105       return totalSize - bucketSize;
1106     }
1107 
1108     public long totalSize() {
1109       return totalSize;
1110     }
1111 
1112     @Override
1113     public int compareTo(BucketEntryGroup that) {
1114       if (this.overflow() == that.overflow())
1115         return 0;
1116       return this.overflow() > that.overflow() ? 1 : -1;
1117     }
1118 
1119     @Override
1120     public boolean equals(Object that) {
1121       return this == that;
1122     }
1123 
1124   }
1125 
1126   /**
1127    * Block Entry stored in the memory with key,data and so on
1128    */
1129   private static class RAMQueueEntry {
1130     private BlockCacheKey key;
1131     private Cacheable data;
1132     private long accessTime;
1133     private boolean inMemory;
1134 
1135     public RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessTime,
1136         boolean inMemory) {
1137       this.key = bck;
1138       this.data = data;
1139       this.accessTime = accessTime;
1140       this.inMemory = inMemory;
1141     }
1142 
1143     public Cacheable getData() {
1144       return data;
1145     }
1146 
1147     public BlockCacheKey getKey() {
1148       return key;
1149     }
1150 
1151     public void access(long accessTime) {
1152       this.accessTime = accessTime;
1153     }
1154 
1155     public BucketEntry writeToCache(final IOEngine ioEngine,
1156         final BucketAllocator bucketAllocator,
1157         final UniqueIndexMap<Integer> deserialiserMap,
1158         final AtomicLong realCacheSize) throws CacheFullException, IOException,
1159         BucketAllocatorException {
1160       int len = data.getSerializedLength();
1161       // This cacheable thing can't be serialized...
1162       if (len == 0) return null;
1163       long offset = bucketAllocator.allocateBlock(len);
1164       BucketEntry bucketEntry = new BucketEntry(offset, len, accessTime,
1165           inMemory);
1166       bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap);
1167       try {
1168         if (data instanceof HFileBlock) {
1169           ByteBuffer sliceBuf = ((HFileBlock) data).getBufferReadOnlyWithHeader();
1170           sliceBuf.rewind();
1171           assert len == sliceBuf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE;
1172           ByteBuffer extraInfoBuffer = ByteBuffer.allocate(HFileBlock.EXTRA_SERIALIZATION_SPACE);
1173           ((HFileBlock) data).serializeExtraInfo(extraInfoBuffer);
1174           ioEngine.write(sliceBuf, offset);
1175           ioEngine.write(extraInfoBuffer, offset + len - HFileBlock.EXTRA_SERIALIZATION_SPACE);
1176         } else {
1177           ByteBuffer bb = ByteBuffer.allocate(len);
1178           data.serialize(bb);
1179           ioEngine.write(bb, offset);
1180         }
1181       } catch (IOException ioe) {
1182         // free it in bucket allocator
1183         bucketAllocator.freeBlock(offset);
1184         throw ioe;
1185       }
1186       
1187       realCacheSize.addAndGet(len);
1188       return bucketEntry;
1189     }
1190   }
1191 
1192   /**
1193    * Only used in test
1194    * @throws InterruptedException
1195    */
1196   void stopWriterThreads() throws InterruptedException {
1197     for (WriterThread writerThread : writerThreads) {
1198       writerThread.disableWriter();
1199       writerThread.interrupt();
1200       writerThread.join();
1201     }
1202   }
1203 
1204   @Override
1205   public Iterator<CachedBlock> iterator() {
1206     // Don't bother with ramcache since stuff is in here only a little while.
1207     final Iterator<Map.Entry<BlockCacheKey, BucketEntry>> i =
1208         this.backingMap.entrySet().iterator();
1209     return new Iterator<CachedBlock>() {
1210       private final long now = System.nanoTime();
1211 
1212       @Override
1213       public boolean hasNext() {
1214         return i.hasNext();
1215       }
1216 
1217       @Override
1218       public CachedBlock next() {
1219         final Map.Entry<BlockCacheKey, BucketEntry> e = i.next();
1220         return new CachedBlock() {
1221           @Override
1222           public String toString() {
1223             return BlockCacheUtil.toString(this, now);
1224           }
1225 
1226           @Override
1227           public BlockPriority getBlockPriority() {
1228             return e.getValue().getPriority();
1229           }
1230 
1231           @Override
1232           public BlockType getBlockType() {
1233             // Not held by BucketEntry.  Could add it if wanted on BucketEntry creation.
1234             return null;
1235           }
1236 
1237           @Override
1238           public long getOffset() {
1239             return e.getKey().getOffset();
1240           }
1241 
1242           @Override
1243           public long getSize() {
1244             return e.getValue().getLength();
1245           }
1246 
1247           @Override
1248           public long getCachedTime() {
1249             return e.getValue().getCachedTime();
1250           }
1251 
1252           @Override
1253           public String getFilename() {
1254             return e.getKey().getHfileName();
1255           }
1256 
1257           @Override
1258           public int compareTo(CachedBlock other) {
1259             int diff = this.getFilename().compareTo(other.getFilename());
1260             if (diff != 0) return diff;
1261             diff = (int)(this.getOffset() - other.getOffset());
1262             if (diff != 0) return diff;
1263             if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
1264               throw new IllegalStateException("" + this.getCachedTime() + ", " +
1265                 other.getCachedTime());
1266             }
1267             return (int)(other.getCachedTime() - this.getCachedTime());
1268           }
1269 
1270           @Override
1271           public boolean equals(Object obj) {
1272             if (obj instanceof CachedBlock) {
1273               CachedBlock cb = (CachedBlock)obj;
1274               return compareTo(cb) == 0;
1275             } else {
1276               return false;
1277             }
1278           }
1279         };
1280       }
1281 
1282       @Override
1283       public void remove() {
1284         throw new UnsupportedOperationException();
1285       }
1286     };
1287   }
1288 
1289   @Override
1290   public BlockCache[] getBlockCaches() {
1291     return null;
1292   }
1293 }