1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.hbase.io.hfile.bucket;
22
23 import java.io.File;
24 import java.io.FileInputStream;
25 import java.io.FileNotFoundException;
26 import java.io.FileOutputStream;
27 import java.io.IOException;
28 import java.io.ObjectInputStream;
29 import java.io.ObjectOutputStream;
30 import java.io.Serializable;
31 import java.nio.ByteBuffer;
32 import java.util.ArrayList;
33 import java.util.Comparator;
34 import java.util.HashSet;
35 import java.util.Iterator;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.PriorityQueue;
39 import java.util.Set;
40 import java.util.concurrent.ArrayBlockingQueue;
41 import java.util.concurrent.BlockingQueue;
42 import java.util.concurrent.ConcurrentHashMap;
43 import java.util.concurrent.ConcurrentMap;
44 import java.util.concurrent.Executors;
45 import java.util.concurrent.ScheduledExecutorService;
46 import java.util.concurrent.TimeUnit;
47 import java.util.concurrent.atomic.AtomicLong;
48 import java.util.concurrent.locks.Lock;
49 import java.util.concurrent.locks.ReentrantLock;
50 import java.util.concurrent.locks.ReentrantReadWriteLock;
51
52 import org.apache.commons.logging.Log;
53 import org.apache.commons.logging.LogFactory;
54 import org.apache.hadoop.hbase.classification.InterfaceAudience;
55 import org.apache.hadoop.hbase.io.HeapSize;
56 import org.apache.hadoop.hbase.io.hfile.BlockCache;
57 import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
58 import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil;
59 import org.apache.hadoop.hbase.io.hfile.BlockPriority;
60 import org.apache.hadoop.hbase.io.hfile.BlockType;
61 import org.apache.hadoop.hbase.io.hfile.CacheStats;
62 import org.apache.hadoop.hbase.io.hfile.Cacheable;
63 import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
64 import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
65 import org.apache.hadoop.hbase.io.hfile.CachedBlock;
66 import org.apache.hadoop.hbase.io.hfile.HFileBlock;
67 import org.apache.hadoop.hbase.util.ConcurrentIndex;
68 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
69 import org.apache.hadoop.hbase.util.HasThread;
70 import org.apache.hadoop.hbase.util.IdReadWriteLock;
71 import org.apache.hadoop.util.StringUtils;
72
73 import com.google.common.annotations.VisibleForTesting;
74 import com.google.common.collect.ImmutableList;
75 import com.google.common.util.concurrent.ThreadFactoryBuilder;
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95 @InterfaceAudience.Private
96 public class BucketCache implements BlockCache, HeapSize {
97 private static final Log LOG = LogFactory.getLog(BucketCache.class);
98
99
100 private static final float DEFAULT_SINGLE_FACTOR = 0.25f;
101 private static final float DEFAULT_MULTI_FACTOR = 0.50f;
102 private static final float DEFAULT_MEMORY_FACTOR = 0.25f;
103 private static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f;
104
105 private static final float DEFAULT_ACCEPT_FACTOR = 0.95f;
106 private static final float DEFAULT_MIN_FACTOR = 0.85f;
107
108
109 private static final int DEFAULT_FREE_ENTIRE_BLOCK_FACTOR = 2;
110
111
112 private static final int statThreadPeriod = 5 * 60;
113
114 final static int DEFAULT_WRITER_THREADS = 3;
115 final static int DEFAULT_WRITER_QUEUE_ITEMS = 64;
116
117
118 final IOEngine ioEngine;
119
120
121 @VisibleForTesting
122 final ConcurrentMap<BlockCacheKey, RAMQueueEntry> ramCache;
123
124 @VisibleForTesting
125 ConcurrentMap<BlockCacheKey, BucketEntry> backingMap;
126
127
128
129
130
131
132 private volatile boolean cacheEnabled;
133
134
135
136
137
138
139
140
141 @VisibleForTesting
142 final ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues =
143 new ArrayList<BlockingQueue<RAMQueueEntry>>();
144 @VisibleForTesting
145 final WriterThread[] writerThreads;
146
147
148 private volatile boolean freeInProgress = false;
149 private final Lock freeSpaceLock = new ReentrantLock();
150
151 private UniqueIndexMap<Integer> deserialiserMap = new UniqueIndexMap<Integer>();
152
153 private final AtomicLong realCacheSize = new AtomicLong(0);
154 private final AtomicLong heapSize = new AtomicLong(0);
155
156 private final AtomicLong blockNumber = new AtomicLong(0);
157
158
159 private final AtomicLong accessCount = new AtomicLong(0);
160
161 private static final int DEFAULT_CACHE_WAIT_TIME = 50;
162
163
164
165 boolean wait_when_cache = false;
166
167 private final BucketCacheStats cacheStats = new BucketCacheStats();
168
169 private final String persistencePath;
170 private final long cacheCapacity;
171
172 private final long blockSize;
173
174
175 private final int ioErrorsTolerationDuration;
176
177 public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000;
178
179
180
181 private volatile long ioErrorStartTime = -1;
182
183
184
185
186
187 @VisibleForTesting
188 final IdReadWriteLock offsetLock = new IdReadWriteLock();
189
190 private final ConcurrentIndex<String, BlockCacheKey> blocksByHFile =
191 new ConcurrentIndex<String, BlockCacheKey>(new Comparator<BlockCacheKey>() {
192 @Override
193 public int compare(BlockCacheKey a, BlockCacheKey b) {
194 if (a.getOffset() == b.getOffset()) {
195 return 0;
196 } else if (a.getOffset() < b.getOffset()) {
197 return -1;
198 }
199 return 1;
200 }
201 });
202
203
204 private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1,
205 new ThreadFactoryBuilder().setNameFormat("BucketCacheStatsExecutor").setDaemon(true).build());
206
207
208 private BucketAllocator bucketAllocator;
209
210 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
211 int writerThreadNum, int writerQLen, String persistencePath) throws FileNotFoundException,
212 IOException {
213 this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
214 persistencePath, DEFAULT_ERROR_TOLERATION_DURATION);
215 }
216
217 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
218 int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration)
219 throws FileNotFoundException, IOException {
220 this.ioEngine = getIOEngineFromName(ioEngineName, capacity);
221 this.writerThreads = new WriterThread[writerThreadNum];
222 long blockNumCapacity = capacity / blockSize;
223 if (blockNumCapacity >= Integer.MAX_VALUE) {
224
225 throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now");
226 }
227
228 this.cacheCapacity = capacity;
229 this.persistencePath = persistencePath;
230 this.blockSize = blockSize;
231 this.ioErrorsTolerationDuration = ioErrorsTolerationDuration;
232
233 bucketAllocator = new BucketAllocator(capacity, bucketSizes);
234 for (int i = 0; i < writerThreads.length; ++i) {
235 writerQueues.add(new ArrayBlockingQueue<RAMQueueEntry>(writerQLen));
236 }
237
238 assert writerQueues.size() == writerThreads.length;
239 this.ramCache = new ConcurrentHashMap<BlockCacheKey, RAMQueueEntry>();
240
241 this.backingMap = new ConcurrentHashMap<BlockCacheKey, BucketEntry>((int) blockNumCapacity);
242
243 if (ioEngine.isPersistent() && persistencePath != null) {
244 try {
245 retrieveFromFile(bucketSizes);
246 } catch (IOException ioex) {
247 LOG.error("Can't restore from file because of", ioex);
248 } catch (ClassNotFoundException cnfe) {
249 LOG.error("Can't restore from file in rebuild because can't deserialise",cnfe);
250 throw new RuntimeException(cnfe);
251 }
252 }
253 final String threadName = Thread.currentThread().getName();
254 this.cacheEnabled = true;
255 for (int i = 0; i < writerThreads.length; ++i) {
256 writerThreads[i] = new WriterThread(writerQueues.get(i));
257 writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i);
258 writerThreads[i].setDaemon(true);
259 }
260 startWriterThreads();
261
262
263
264
265 this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
266 statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
267 LOG.info("Started bucket cache; ioengine=" + ioEngineName +
268 ", capacity=" + StringUtils.byteDesc(capacity) +
269 ", blockSize=" + StringUtils.byteDesc(blockSize) + ", writerThreadNum=" +
270 writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath=" +
271 persistencePath + ", bucketAllocator=" + this.bucketAllocator.getClass().getName());
272 }
273
274
275
276
277
278 @VisibleForTesting
279 protected void startWriterThreads() {
280 for (WriterThread thread : writerThreads) {
281 thread.start();
282 }
283 }
284
285 @VisibleForTesting
286 boolean isCacheEnabled() {
287 return this.cacheEnabled;
288 }
289
290 public long getMaxSize() {
291 return this.cacheCapacity;
292 }
293
294 public String getIoEngine() {
295 return ioEngine.toString();
296 }
297
298
299
300
301
302
303
304
305 private IOEngine getIOEngineFromName(String ioEngineName, long capacity)
306 throws IOException {
307 if (ioEngineName.startsWith("file:"))
308 return new FileIOEngine(ioEngineName.substring(5), capacity);
309 else if (ioEngineName.startsWith("offheap"))
310 return new ByteBufferIOEngine(capacity, true);
311 else if (ioEngineName.startsWith("heap"))
312 return new ByteBufferIOEngine(capacity, false);
313 else
314 throw new IllegalArgumentException(
315 "Don't understand io engine name for cache - prefix with file:, heap or offheap");
316 }
317
318
319
320
321
322
323 @Override
324 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
325 cacheBlock(cacheKey, buf, false, false);
326 }
327
328
329
330
331
332
333
334
335 @Override
336 public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
337 final boolean cacheDataInL1) {
338 cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache);
339 }
340
341
342
343
344
345
346
347
348 public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
349 boolean wait) {
350 if (!cacheEnabled) {
351 return;
352 }
353
354 if (backingMap.containsKey(cacheKey)) {
355 return;
356 }
357
358
359
360
361 RAMQueueEntry re =
362 new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory);
363 if (ramCache.putIfAbsent(cacheKey, re) != null) {
364 return;
365 }
366 int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size();
367 BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum);
368 boolean successfulAddition = false;
369 if (wait) {
370 try {
371 successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS);
372 } catch (InterruptedException e) {
373 Thread.currentThread().interrupt();
374 }
375 } else {
376 successfulAddition = bq.offer(re);
377 }
378 if (!successfulAddition) {
379 ramCache.remove(cacheKey);
380 cacheStats.failInsert();
381 } else {
382 this.blockNumber.incrementAndGet();
383 this.heapSize.addAndGet(cachedItem.heapSize());
384 blocksByHFile.put(cacheKey.getHfileName(), cacheKey);
385 }
386 }
387
388
389
390
391
392
393
394
395
396 @Override
397 public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
398 boolean updateCacheMetrics) {
399 if (!cacheEnabled) {
400 return null;
401 }
402 RAMQueueEntry re = ramCache.get(key);
403 if (re != null) {
404 if (updateCacheMetrics) {
405 cacheStats.hit(caching, key.isPrimary());
406 }
407 re.access(accessCount.incrementAndGet());
408 return re.getData();
409 }
410 BucketEntry bucketEntry = backingMap.get(key);
411 if (bucketEntry != null) {
412 long start = System.nanoTime();
413 ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset());
414 try {
415 lock.readLock().lock();
416
417
418
419 if (bucketEntry.equals(backingMap.get(key))) {
420 int len = bucketEntry.getLength();
421 ByteBuffer bb = ByteBuffer.allocate(len);
422 int lenRead = ioEngine.read(bb, bucketEntry.offset());
423 if (lenRead != len) {
424 throw new RuntimeException("Only " + lenRead + " bytes read, " + len + " expected");
425 }
426 CacheableDeserializer<Cacheable> deserializer =
427 bucketEntry.deserializerReference(this.deserialiserMap);
428 Cacheable cachedBlock = deserializer.deserialize(bb, true);
429 long timeTaken = System.nanoTime() - start;
430 if (updateCacheMetrics) {
431 cacheStats.hit(caching, key.isPrimary());
432 cacheStats.ioHit(timeTaken);
433 }
434 bucketEntry.access(accessCount.incrementAndGet());
435 if (this.ioErrorStartTime > 0) {
436 ioErrorStartTime = -1;
437 }
438 return cachedBlock;
439 }
440 } catch (IOException ioex) {
441 LOG.error("Failed reading block " + key + " from bucket cache", ioex);
442 checkIOErrorIsTolerated();
443 } finally {
444 lock.readLock().unlock();
445 }
446 }
447 if (!repeat && updateCacheMetrics) {
448 cacheStats.miss(caching, key.isPrimary());
449 }
450 return null;
451 }
452
453 @VisibleForTesting
454 void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber) {
455 bucketAllocator.freeBlock(bucketEntry.offset());
456 realCacheSize.addAndGet(-1 * bucketEntry.getLength());
457 blocksByHFile.remove(cacheKey.getHfileName(), cacheKey);
458 if (decrementBlockNumber) {
459 this.blockNumber.decrementAndGet();
460 }
461 }
462
463 @Override
464 public boolean evictBlock(BlockCacheKey cacheKey) {
465 if (!cacheEnabled) {
466 return false;
467 }
468 RAMQueueEntry removedBlock = ramCache.remove(cacheKey);
469 if (removedBlock != null) {
470 this.blockNumber.decrementAndGet();
471 this.heapSize.addAndGet(-1 * removedBlock.getData().heapSize());
472 }
473 BucketEntry bucketEntry = backingMap.get(cacheKey);
474 if (bucketEntry == null) {
475 if (removedBlock != null) {
476 cacheStats.evicted(0, cacheKey.isPrimary());
477 return true;
478 } else {
479 return false;
480 }
481 }
482 ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset());
483 try {
484 lock.writeLock().lock();
485 if (backingMap.remove(cacheKey, bucketEntry)) {
486 blockEvicted(cacheKey, bucketEntry, removedBlock == null);
487 } else {
488 return false;
489 }
490 } finally {
491 lock.writeLock().unlock();
492 }
493 cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary());
494 return true;
495 }
496
497
498
499
500 private static class StatisticsThread extends Thread {
501 private final BucketCache bucketCache;
502
503 public StatisticsThread(BucketCache bucketCache) {
504 super("BucketCacheStatsThread");
505 setDaemon(true);
506 this.bucketCache = bucketCache;
507 }
508
509 @Override
510 public void run() {
511 bucketCache.logStats();
512 }
513 }
514
515 public void logStats() {
516 long totalSize = bucketAllocator.getTotalSize();
517 long usedSize = bucketAllocator.getUsedSize();
518 long freeSize = totalSize - usedSize;
519 long cacheSize = getRealCacheSize();
520 LOG.info("failedBlockAdditions=" + cacheStats.getFailedInserts() + ", " +
521 "totalSize=" + StringUtils.byteDesc(totalSize) + ", " +
522 "freeSize=" + StringUtils.byteDesc(freeSize) + ", " +
523 "usedSize=" + StringUtils.byteDesc(usedSize) +", " +
524 "cacheSize=" + StringUtils.byteDesc(cacheSize) +", " +
525 "accesses=" + cacheStats.getRequestCount() + ", " +
526 "hits=" + cacheStats.getHitCount() + ", " +
527 "IOhitsPerSecond=" + cacheStats.getIOHitsPerSecond() + ", " +
528 "IOTimePerHit=" + String.format("%.2f", cacheStats.getIOTimePerHit())+ ", " +
529 "hitRatio=" + (cacheStats.getHitCount() == 0 ? "0," :
530 (StringUtils.formatPercent(cacheStats.getHitRatio(), 2)+ ", ")) +
531 "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " +
532 "cachingHits=" + cacheStats.getHitCachingCount() + ", " +
533 "cachingHitsRatio=" +(cacheStats.getHitCachingCount() == 0 ? "0," :
534 (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2)+ ", ")) +
535 "evictions=" + cacheStats.getEvictionCount() + ", " +
536 "evicted=" + cacheStats.getEvictedCount() + ", " +
537 "evictedPerRun=" + cacheStats.evictedPerEviction());
538 cacheStats.reset();
539 }
540
541 public long getRealCacheSize() {
542 return this.realCacheSize.get();
543 }
544
545 private long acceptableSize() {
546 return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_ACCEPT_FACTOR);
547 }
548
549 private long singleSize() {
550 return (long) Math.floor(bucketAllocator.getTotalSize()
551 * DEFAULT_SINGLE_FACTOR * DEFAULT_MIN_FACTOR);
552 }
553
554 private long multiSize() {
555 return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_MULTI_FACTOR
556 * DEFAULT_MIN_FACTOR);
557 }
558
559 private long memorySize() {
560 return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_MEMORY_FACTOR
561 * DEFAULT_MIN_FACTOR);
562 }
563
564
565
566
567 private int bucketSizesAboveThresholdCount(float minFactor) {
568 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics();
569 int fullCount = 0;
570 for (int i = 0; i < stats.length; i++) {
571 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor));
572 freeGoal = Math.max(freeGoal, 1);
573 if (stats[i].freeCount() < freeGoal) {
574 fullCount++;
575 }
576 }
577 return fullCount;
578 }
579
580
581
582
583
584
585
586
587
588
589 private void freeEntireBuckets(int completelyFreeBucketsNeeded) {
590 if (completelyFreeBucketsNeeded != 0) {
591
592
593 Set<Integer> inUseBuckets = new HashSet<Integer>();
594 for (BucketEntry entry : backingMap.values()) {
595 inUseBuckets.add(bucketAllocator.getBucketIndex(entry.offset()));
596 }
597
598 Set<Integer> candidateBuckets = bucketAllocator.getLeastFilledBuckets(
599 inUseBuckets, completelyFreeBucketsNeeded);
600 for (Map.Entry<BlockCacheKey, BucketEntry> entry : backingMap.entrySet()) {
601 if (candidateBuckets.contains(bucketAllocator
602 .getBucketIndex(entry.getValue().offset()))) {
603 evictBlock(entry.getKey());
604 }
605 }
606 }
607 }
608
609
610
611
612
613
614
615 private void freeSpace(final String why) {
616
617 if (!freeSpaceLock.tryLock()) return;
618 try {
619 freeInProgress = true;
620 long bytesToFreeWithoutExtra = 0;
621
622 StringBuffer msgBuffer = LOG.isDebugEnabled()? new StringBuffer(): null;
623 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics();
624 long[] bytesToFreeForBucket = new long[stats.length];
625 for (int i = 0; i < stats.length; i++) {
626 bytesToFreeForBucket[i] = 0;
627 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - DEFAULT_MIN_FACTOR));
628 freeGoal = Math.max(freeGoal, 1);
629 if (stats[i].freeCount() < freeGoal) {
630 bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount());
631 bytesToFreeWithoutExtra += bytesToFreeForBucket[i];
632 if (msgBuffer != null) {
633 msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")="
634 + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", ");
635 }
636 }
637 }
638 if (msgBuffer != null) {
639 msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", ");
640 }
641
642 if (bytesToFreeWithoutExtra <= 0) {
643 return;
644 }
645 long currentSize = bucketAllocator.getUsedSize();
646 long totalSize=bucketAllocator.getTotalSize();
647 if (LOG.isDebugEnabled() && msgBuffer != null) {
648 LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString() +
649 " of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" +
650 StringUtils.byteDesc(realCacheSize.get()) + ", total=" + StringUtils.byteDesc(totalSize));
651 }
652
653 long bytesToFreeWithExtra = (long) Math.floor(bytesToFreeWithoutExtra
654 * (1 + DEFAULT_EXTRA_FREE_FACTOR));
655
656
657 BucketEntryGroup bucketSingle = new BucketEntryGroup(bytesToFreeWithExtra,
658 blockSize, singleSize());
659 BucketEntryGroup bucketMulti = new BucketEntryGroup(bytesToFreeWithExtra,
660 blockSize, multiSize());
661 BucketEntryGroup bucketMemory = new BucketEntryGroup(bytesToFreeWithExtra,
662 blockSize, memorySize());
663
664
665
666 for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) {
667 switch (bucketEntryWithKey.getValue().getPriority()) {
668 case SINGLE: {
669 bucketSingle.add(bucketEntryWithKey);
670 break;
671 }
672 case MULTI: {
673 bucketMulti.add(bucketEntryWithKey);
674 break;
675 }
676 case MEMORY: {
677 bucketMemory.add(bucketEntryWithKey);
678 break;
679 }
680 }
681 }
682
683 PriorityQueue<BucketEntryGroup> bucketQueue = new PriorityQueue<BucketEntryGroup>(3);
684
685 bucketQueue.add(bucketSingle);
686 bucketQueue.add(bucketMulti);
687 bucketQueue.add(bucketMemory);
688
689 int remainingBuckets = 3;
690 long bytesFreed = 0;
691
692 BucketEntryGroup bucketGroup;
693 while ((bucketGroup = bucketQueue.poll()) != null) {
694 long overflow = bucketGroup.overflow();
695 if (overflow > 0) {
696 long bucketBytesToFree = Math.min(overflow,
697 (bytesToFreeWithoutExtra - bytesFreed) / remainingBuckets);
698 bytesFreed += bucketGroup.free(bucketBytesToFree);
699 }
700 remainingBuckets--;
701 }
702
703
704 if (bucketSizesAboveThresholdCount(DEFAULT_MIN_FACTOR) > 0) {
705 bucketQueue.clear();
706 remainingBuckets = 3;
707
708 bucketQueue.add(bucketSingle);
709 bucketQueue.add(bucketMulti);
710 bucketQueue.add(bucketMemory);
711
712 while ((bucketGroup = bucketQueue.poll()) != null) {
713 long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets;
714 bytesFreed += bucketGroup.free(bucketBytesToFree);
715 remainingBuckets--;
716 }
717 }
718
719
720
721
722
723
724 freeEntireBuckets(DEFAULT_FREE_ENTIRE_BLOCK_FACTOR *
725 bucketSizesAboveThresholdCount(1.0f));
726
727 if (LOG.isDebugEnabled()) {
728 long single = bucketSingle.totalSize();
729 long multi = bucketMulti.totalSize();
730 long memory = bucketMemory.totalSize();
731 if (LOG.isDebugEnabled()) {
732 LOG.debug("Bucket cache free space completed; " + "freed="
733 + StringUtils.byteDesc(bytesFreed) + ", " + "total="
734 + StringUtils.byteDesc(totalSize) + ", " + "single="
735 + StringUtils.byteDesc(single) + ", " + "multi="
736 + StringUtils.byteDesc(multi) + ", " + "memory="
737 + StringUtils.byteDesc(memory));
738 }
739 }
740
741 } catch (Throwable t) {
742 LOG.warn("Failed freeing space", t);
743 } finally {
744 cacheStats.evict();
745 freeInProgress = false;
746 freeSpaceLock.unlock();
747 }
748 }
749
750
751 @VisibleForTesting
752 class WriterThread extends HasThread {
753 private final BlockingQueue<RAMQueueEntry> inputQueue;
754 private volatile boolean writerEnabled = true;
755
756 WriterThread(BlockingQueue<RAMQueueEntry> queue) {
757 super("BucketCacheWriterThread");
758 this.inputQueue = queue;
759 }
760
761
762 @VisibleForTesting
763 void disableWriter() {
764 this.writerEnabled = false;
765 }
766
767 public void run() {
768 List<RAMQueueEntry> entries = new ArrayList<RAMQueueEntry>();
769 try {
770 while (cacheEnabled && writerEnabled) {
771 try {
772 try {
773
774 entries = getRAMQueueEntries(inputQueue, entries);
775 } catch (InterruptedException ie) {
776 if (!cacheEnabled) break;
777 }
778 doDrain(entries);
779 } catch (Exception ioe) {
780 LOG.error("WriterThread encountered error", ioe);
781 }
782 }
783 } catch (Throwable t) {
784 LOG.warn("Failed doing drain", t);
785 }
786 LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled);
787 }
788
789
790
791
792
793
794
795
796
797 @VisibleForTesting
798 void doDrain(final List<RAMQueueEntry> entries) throws InterruptedException {
799 if (entries.isEmpty()) {
800 return;
801 }
802
803
804
805
806
807
808 final int size = entries.size();
809 BucketEntry[] bucketEntries = new BucketEntry[size];
810
811
812 int index = 0;
813 while (cacheEnabled && index < size) {
814 RAMQueueEntry re = null;
815 try {
816 re = entries.get(index);
817 if (re == null) {
818 LOG.warn("Couldn't get entry or changed on us; who else is messing with it?");
819 index++;
820 continue;
821 }
822 BucketEntry bucketEntry =
823 re.writeToCache(ioEngine, bucketAllocator, deserialiserMap, realCacheSize);
824
825 bucketEntries[index] = bucketEntry;
826 if (ioErrorStartTime > 0) {
827 ioErrorStartTime = -1;
828 }
829 index++;
830 } catch (BucketAllocatorException fle) {
831 LOG.warn("Failed allocation for " + (re == null ? "" : re.getKey()) + "; " + fle);
832
833 bucketEntries[index] = null;
834 index++;
835 } catch (CacheFullException cfe) {
836
837 if (!freeInProgress) {
838 freeSpace("Full!");
839 } else {
840 Thread.sleep(50);
841 }
842 } catch (IOException ioex) {
843
844 LOG.error("Failed writing to bucket cache", ioex);
845 checkIOErrorIsTolerated();
846 }
847 }
848
849
850 try {
851 ioEngine.sync();
852 } catch (IOException ioex) {
853 LOG.error("Failed syncing IO engine", ioex);
854 checkIOErrorIsTolerated();
855
856 for (int i = 0; i < entries.size(); ++i) {
857 if (bucketEntries[i] != null) {
858 bucketAllocator.freeBlock(bucketEntries[i].offset());
859 bucketEntries[i] = null;
860 }
861 }
862 }
863
864
865
866 for (int i = 0; i < size; ++i) {
867 BlockCacheKey key = entries.get(i).getKey();
868
869 if (bucketEntries[i] != null) {
870 backingMap.put(key, bucketEntries[i]);
871 }
872
873 RAMQueueEntry ramCacheEntry = ramCache.remove(key);
874 if (ramCacheEntry != null) {
875 heapSize.addAndGet(-1 * entries.get(i).getData().heapSize());
876 } else if (bucketEntries[i] != null){
877
878 ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntries[i].offset());
879 try {
880 lock.writeLock().lock();
881 if (backingMap.remove(key, bucketEntries[i])) {
882 blockEvicted(key, bucketEntries[i], false);
883 }
884 } finally {
885 lock.writeLock().unlock();
886 }
887 }
888 }
889
890 long used = bucketAllocator.getUsedSize();
891 if (used > acceptableSize()) {
892 freeSpace("Used=" + used + " > acceptable=" + acceptableSize());
893 }
894 return;
895 }
896 }
897
898
899
900
901
902
903
904
905
906 @VisibleForTesting
907 static List<RAMQueueEntry> getRAMQueueEntries(final BlockingQueue<RAMQueueEntry> q,
908 final List<RAMQueueEntry> receptical)
909 throws InterruptedException {
910
911
912 receptical.clear();
913 receptical.add(q.take());
914 q.drainTo(receptical);
915 return receptical;
916 }
917
918 private void persistToFile() throws IOException {
919 assert !cacheEnabled;
920 FileOutputStream fos = null;
921 ObjectOutputStream oos = null;
922 try {
923 if (!ioEngine.isPersistent())
924 throw new IOException(
925 "Attempt to persist non-persistent cache mappings!");
926 fos = new FileOutputStream(persistencePath, false);
927 oos = new ObjectOutputStream(fos);
928 oos.writeLong(cacheCapacity);
929 oos.writeUTF(ioEngine.getClass().getName());
930 oos.writeUTF(backingMap.getClass().getName());
931 oos.writeObject(deserialiserMap);
932 oos.writeObject(backingMap);
933 } finally {
934 if (oos != null) oos.close();
935 if (fos != null) fos.close();
936 }
937 }
938
939 @SuppressWarnings("unchecked")
940 private void retrieveFromFile(int[] bucketSizes) throws IOException, BucketAllocatorException,
941 ClassNotFoundException {
942 File persistenceFile = new File(persistencePath);
943 if (!persistenceFile.exists()) {
944 return;
945 }
946 assert !cacheEnabled;
947 FileInputStream fis = null;
948 ObjectInputStream ois = null;
949 try {
950 if (!ioEngine.isPersistent())
951 throw new IOException(
952 "Attempt to restore non-persistent cache mappings!");
953 fis = new FileInputStream(persistencePath);
954 ois = new ObjectInputStream(fis);
955 long capacitySize = ois.readLong();
956 if (capacitySize != cacheCapacity)
957 throw new IOException("Mismatched cache capacity:"
958 + StringUtils.byteDesc(capacitySize) + ", expected: "
959 + StringUtils.byteDesc(cacheCapacity));
960 String ioclass = ois.readUTF();
961 String mapclass = ois.readUTF();
962 if (!ioEngine.getClass().getName().equals(ioclass))
963 throw new IOException("Class name for IO engine mismatch: " + ioclass
964 + ", expected:" + ioEngine.getClass().getName());
965 if (!backingMap.getClass().getName().equals(mapclass))
966 throw new IOException("Class name for cache map mismatch: " + mapclass
967 + ", expected:" + backingMap.getClass().getName());
968 UniqueIndexMap<Integer> deserMap = (UniqueIndexMap<Integer>) ois
969 .readObject();
970 ConcurrentHashMap<BlockCacheKey, BucketEntry> backingMapFromFile =
971 (ConcurrentHashMap<BlockCacheKey, BucketEntry>) ois.readObject();
972 BucketAllocator allocator = new BucketAllocator(cacheCapacity, bucketSizes,
973 backingMapFromFile, realCacheSize);
974 bucketAllocator = allocator;
975 deserialiserMap = deserMap;
976 backingMap = backingMapFromFile;
977 } finally {
978 if (ois != null) ois.close();
979 if (fis != null) fis.close();
980 if (!persistenceFile.delete()) {
981 throw new IOException("Failed deleting persistence file "
982 + persistenceFile.getAbsolutePath());
983 }
984 }
985 }
986
987
988
989
990
991
992 private void checkIOErrorIsTolerated() {
993 long now = EnvironmentEdgeManager.currentTime();
994 if (this.ioErrorStartTime > 0) {
995 if (cacheEnabled && (now - ioErrorStartTime) > this.ioErrorsTolerationDuration) {
996 LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration +
997 "ms, disabing cache, please check your IOEngine");
998 disableCache();
999 }
1000 } else {
1001 this.ioErrorStartTime = now;
1002 }
1003 }
1004
1005
1006
1007
1008
1009 private void disableCache() {
1010 if (!cacheEnabled)
1011 return;
1012 cacheEnabled = false;
1013 ioEngine.shutdown();
1014 this.scheduleThreadPool.shutdown();
1015 for (int i = 0; i < writerThreads.length; ++i)
1016 writerThreads[i].interrupt();
1017 this.ramCache.clear();
1018 if (!ioEngine.isPersistent() || persistencePath == null) {
1019 this.backingMap.clear();
1020 }
1021 }
1022
1023 private void join() throws InterruptedException {
1024 for (int i = 0; i < writerThreads.length; ++i)
1025 writerThreads[i].join();
1026 }
1027
1028 @Override
1029 public void shutdown() {
1030 disableCache();
1031 LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent()
1032 + "; path to write=" + persistencePath);
1033 if (ioEngine.isPersistent() && persistencePath != null) {
1034 try {
1035 join();
1036 persistToFile();
1037 } catch (IOException ex) {
1038 LOG.error("Unable to persist data on exit: " + ex.toString(), ex);
1039 } catch (InterruptedException e) {
1040 LOG.warn("Failed to persist data on exit", e);
1041 }
1042 }
1043 }
1044
1045 @Override
1046 public CacheStats getStats() {
1047 return cacheStats;
1048 }
1049
1050 public BucketAllocator getAllocator() {
1051 return this.bucketAllocator;
1052 }
1053
1054 @Override
1055 public long heapSize() {
1056 return this.heapSize.get();
1057 }
1058
1059 @Override
1060 public long size() {
1061 return this.realCacheSize.get();
1062 }
1063
1064 @Override
1065 public long getFreeSize() {
1066 return this.bucketAllocator.getFreeSize();
1067 }
1068
1069 @Override
1070 public long getBlockCount() {
1071 return this.blockNumber.get();
1072 }
1073
1074 @Override
1075 public long getCurrentSize() {
1076 return this.bucketAllocator.getUsedSize();
1077 }
1078
1079
1080
1081
1082
1083
1084
1085
1086 @Override
1087 public int evictBlocksByHfileName(String hfileName) {
1088
1089
1090 Set<BlockCacheKey> keySet = blocksByHFile.values(hfileName);
1091 if (keySet == null) {
1092 return 0;
1093 }
1094 int numEvicted = 0;
1095 List<BlockCacheKey> keysForHFile = ImmutableList.copyOf(keySet);
1096 for (BlockCacheKey key : keysForHFile) {
1097 if (evictBlock(key)) {
1098 ++numEvicted;
1099 }
1100 }
1101
1102 return numEvicted;
1103 }
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113 static class BucketEntry implements Serializable {
1114 private static final long serialVersionUID = -6741504807982257534L;
1115
1116
1117 static final Comparator<BucketEntry> COMPARATOR = new Comparator<BucketCache.BucketEntry>() {
1118
1119 @Override
1120 public int compare(BucketEntry o1, BucketEntry o2) {
1121 return Long.compare(o2.accessCounter, o1.accessCounter);
1122 }
1123 };
1124
1125 private int offsetBase;
1126 private int length;
1127 private byte offset1;
1128 byte deserialiserIndex;
1129 private volatile long accessCounter;
1130 private BlockPriority priority;
1131
1132
1133
1134 private final long cachedTime = System.nanoTime();
1135
1136 BucketEntry(long offset, int length, long accessCounter, boolean inMemory) {
1137 setOffset(offset);
1138 this.length = length;
1139 this.accessCounter = accessCounter;
1140 if (inMemory) {
1141 this.priority = BlockPriority.MEMORY;
1142 } else {
1143 this.priority = BlockPriority.SINGLE;
1144 }
1145 }
1146
1147 long offset() {
1148 long o = ((long) offsetBase) & 0xFFFFFFFFL;
1149 o += (((long) (offset1)) & 0xFF) << 32;
1150 return o << 8;
1151 }
1152
1153 private void setOffset(long value) {
1154 assert (value & 0xFF) == 0;
1155 value >>= 8;
1156 offsetBase = (int) value;
1157 offset1 = (byte) (value >> 32);
1158 }
1159
1160 public int getLength() {
1161 return length;
1162 }
1163
1164 protected CacheableDeserializer<Cacheable> deserializerReference(
1165 UniqueIndexMap<Integer> deserialiserMap) {
1166 return CacheableDeserializerIdManager.getDeserializer(deserialiserMap
1167 .unmap(deserialiserIndex));
1168 }
1169
1170 protected void setDeserialiserReference(
1171 CacheableDeserializer<Cacheable> deserializer,
1172 UniqueIndexMap<Integer> deserialiserMap) {
1173 this.deserialiserIndex = ((byte) deserialiserMap.map(deserializer
1174 .getDeserialiserIdentifier()));
1175 }
1176
1177
1178
1179
1180 public void access(long accessCounter) {
1181 this.accessCounter = accessCounter;
1182 if (this.priority == BlockPriority.SINGLE) {
1183 this.priority = BlockPriority.MULTI;
1184 }
1185 }
1186
1187 public BlockPriority getPriority() {
1188 return this.priority;
1189 }
1190
1191 public long getCachedTime() {
1192 return cachedTime;
1193 }
1194 }
1195
1196
1197
1198
1199
1200
1201
1202 private class BucketEntryGroup implements Comparable<BucketEntryGroup> {
1203
1204 private CachedEntryQueue queue;
1205 private long totalSize = 0;
1206 private long bucketSize;
1207
1208 public BucketEntryGroup(long bytesToFree, long blockSize, long bucketSize) {
1209 this.bucketSize = bucketSize;
1210 queue = new CachedEntryQueue(bytesToFree, blockSize);
1211 totalSize = 0;
1212 }
1213
1214 public void add(Map.Entry<BlockCacheKey, BucketEntry> block) {
1215 totalSize += block.getValue().getLength();
1216 queue.add(block);
1217 }
1218
1219 public long free(long toFree) {
1220 Map.Entry<BlockCacheKey, BucketEntry> entry;
1221 long freedBytes = 0;
1222 while ((entry = queue.pollLast()) != null) {
1223 evictBlock(entry.getKey());
1224 freedBytes += entry.getValue().getLength();
1225 if (freedBytes >= toFree) {
1226 return freedBytes;
1227 }
1228 }
1229 return freedBytes;
1230 }
1231
1232 public long overflow() {
1233 return totalSize - bucketSize;
1234 }
1235
1236 public long totalSize() {
1237 return totalSize;
1238 }
1239
1240 @Override
1241 public int compareTo(BucketEntryGroup that) {
1242 return Long.compare(this.overflow(), that.overflow());
1243 }
1244
1245 @Override
1246 public boolean equals(Object that) {
1247 return this == that;
1248 }
1249
1250 }
1251
1252
1253
1254
1255 @VisibleForTesting
1256 static class RAMQueueEntry {
1257 private BlockCacheKey key;
1258 private Cacheable data;
1259 private long accessCounter;
1260 private boolean inMemory;
1261
1262 public RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter,
1263 boolean inMemory) {
1264 this.key = bck;
1265 this.data = data;
1266 this.accessCounter = accessCounter;
1267 this.inMemory = inMemory;
1268 }
1269
1270 public Cacheable getData() {
1271 return data;
1272 }
1273
1274 public BlockCacheKey getKey() {
1275 return key;
1276 }
1277
1278 public void access(long accessCounter) {
1279 this.accessCounter = accessCounter;
1280 }
1281
1282 public BucketEntry writeToCache(final IOEngine ioEngine,
1283 final BucketAllocator bucketAllocator,
1284 final UniqueIndexMap<Integer> deserialiserMap,
1285 final AtomicLong realCacheSize) throws CacheFullException, IOException,
1286 BucketAllocatorException {
1287 int len = data.getSerializedLength();
1288
1289 if (len == 0) return null;
1290 long offset = bucketAllocator.allocateBlock(len);
1291 BucketEntry bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory);
1292 bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap);
1293 try {
1294 if (data instanceof HFileBlock) {
1295 HFileBlock block = (HFileBlock) data;
1296 ByteBuffer sliceBuf = block.getBufferReadOnlyWithHeader();
1297 sliceBuf.rewind();
1298 assert len == sliceBuf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE ||
1299 len == sliceBuf.limit() + block.headerSize() + HFileBlock.EXTRA_SERIALIZATION_SPACE;
1300 ByteBuffer extraInfoBuffer = ByteBuffer.allocate(HFileBlock.EXTRA_SERIALIZATION_SPACE);
1301 block.serializeExtraInfo(extraInfoBuffer);
1302 ioEngine.write(sliceBuf, offset);
1303 ioEngine.write(extraInfoBuffer, offset + len - HFileBlock.EXTRA_SERIALIZATION_SPACE);
1304 } else {
1305 ByteBuffer bb = ByteBuffer.allocate(len);
1306 data.serialize(bb);
1307 ioEngine.write(bb, offset);
1308 }
1309 } catch (IOException ioe) {
1310
1311 bucketAllocator.freeBlock(offset);
1312 throw ioe;
1313 }
1314
1315 realCacheSize.addAndGet(len);
1316 return bucketEntry;
1317 }
1318 }
1319
1320
1321
1322
1323
1324 void stopWriterThreads() throws InterruptedException {
1325 for (WriterThread writerThread : writerThreads) {
1326 writerThread.disableWriter();
1327 writerThread.interrupt();
1328 writerThread.join();
1329 }
1330 }
1331
1332 @Override
1333 public Iterator<CachedBlock> iterator() {
1334
1335 final Iterator<Map.Entry<BlockCacheKey, BucketEntry>> i =
1336 this.backingMap.entrySet().iterator();
1337 return new Iterator<CachedBlock>() {
1338 private final long now = System.nanoTime();
1339
1340 @Override
1341 public boolean hasNext() {
1342 return i.hasNext();
1343 }
1344
1345 @Override
1346 public CachedBlock next() {
1347 final Map.Entry<BlockCacheKey, BucketEntry> e = i.next();
1348 return new CachedBlock() {
1349 @Override
1350 public String toString() {
1351 return BlockCacheUtil.toString(this, now);
1352 }
1353
1354 @Override
1355 public BlockPriority getBlockPriority() {
1356 return e.getValue().getPriority();
1357 }
1358
1359 @Override
1360 public BlockType getBlockType() {
1361
1362 return null;
1363 }
1364
1365 @Override
1366 public long getOffset() {
1367 return e.getKey().getOffset();
1368 }
1369
1370 @Override
1371 public long getSize() {
1372 return e.getValue().getLength();
1373 }
1374
1375 @Override
1376 public long getCachedTime() {
1377 return e.getValue().getCachedTime();
1378 }
1379
1380 @Override
1381 public String getFilename() {
1382 return e.getKey().getHfileName();
1383 }
1384
1385 @Override
1386 public int compareTo(CachedBlock other) {
1387 int diff = this.getFilename().compareTo(other.getFilename());
1388 if (diff != 0) return diff;
1389
1390 diff = Long.compare(this.getOffset(), other.getOffset());
1391 if (diff != 0) return diff;
1392 if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
1393 throw new IllegalStateException("" + this.getCachedTime() + ", " +
1394 other.getCachedTime());
1395 }
1396 return Long.compare(other.getCachedTime(), this.getCachedTime());
1397 }
1398
1399 @Override
1400 public int hashCode() {
1401 return e.getKey().hashCode();
1402 }
1403
1404 @Override
1405 public boolean equals(Object obj) {
1406 if (obj instanceof CachedBlock) {
1407 CachedBlock cb = (CachedBlock)obj;
1408 return compareTo(cb) == 0;
1409 } else {
1410 return false;
1411 }
1412 }
1413 };
1414 }
1415
1416 @Override
1417 public void remove() {
1418 throw new UnsupportedOperationException();
1419 }
1420 };
1421 }
1422
1423 @Override
1424 public BlockCache[] getBlockCaches() {
1425 return null;
1426 }
1427 }