1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.hbase.io.hfile.bucket;
22
23 import java.io.File;
24 import java.io.FileInputStream;
25 import java.io.FileNotFoundException;
26 import java.io.FileOutputStream;
27 import java.io.IOException;
28 import java.io.ObjectInputStream;
29 import java.io.ObjectOutputStream;
30 import java.io.Serializable;
31 import java.nio.ByteBuffer;
32 import java.util.ArrayList;
33 import java.util.Comparator;
34 import java.util.Iterator;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.PriorityQueue;
38 import java.util.Set;
39 import java.util.concurrent.ArrayBlockingQueue;
40 import java.util.concurrent.BlockingQueue;
41 import java.util.concurrent.ConcurrentHashMap;
42 import java.util.concurrent.ConcurrentMap;
43 import java.util.concurrent.Executors;
44 import java.util.concurrent.ScheduledExecutorService;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.atomic.AtomicLong;
47 import java.util.concurrent.locks.Lock;
48 import java.util.concurrent.locks.ReentrantLock;
49
50 import org.apache.commons.logging.Log;
51 import org.apache.commons.logging.LogFactory;
52 import org.apache.hadoop.hbase.classification.InterfaceAudience;
53 import org.apache.hadoop.hbase.io.HeapSize;
54 import org.apache.hadoop.hbase.io.hfile.BlockCache;
55 import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
56 import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil;
57 import org.apache.hadoop.hbase.io.hfile.BlockPriority;
58 import org.apache.hadoop.hbase.io.hfile.BlockType;
59 import org.apache.hadoop.hbase.io.hfile.CacheStats;
60 import org.apache.hadoop.hbase.io.hfile.Cacheable;
61 import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
62 import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
63 import org.apache.hadoop.hbase.io.hfile.CachedBlock;
64 import org.apache.hadoop.hbase.io.hfile.HFileBlock;
65 import org.apache.hadoop.hbase.util.ConcurrentIndex;
66 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
67 import org.apache.hadoop.hbase.util.HasThread;
68 import org.apache.hadoop.hbase.util.IdLock;
69 import org.apache.hadoop.util.StringUtils;
70
71 import com.google.common.annotations.VisibleForTesting;
72 import com.google.common.collect.ImmutableList;
73 import com.google.common.util.concurrent.ThreadFactoryBuilder;
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93 @InterfaceAudience.Private
94 public class BucketCache implements BlockCache, HeapSize {
95 static final Log LOG = LogFactory.getLog(BucketCache.class);
96
97
98 private static final float DEFAULT_SINGLE_FACTOR = 0.25f;
99 private static final float DEFAULT_MULTI_FACTOR = 0.50f;
100 private static final float DEFAULT_MEMORY_FACTOR = 0.25f;
101 private static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f;
102
103 private static final float DEFAULT_ACCEPT_FACTOR = 0.95f;
104 private static final float DEFAULT_MIN_FACTOR = 0.85f;
105
106
107 private static final int statThreadPeriod = 5 * 60;
108
109 final static int DEFAULT_WRITER_THREADS = 3;
110 final static int DEFAULT_WRITER_QUEUE_ITEMS = 64;
111
112
113 final IOEngine ioEngine;
114
115
116 @VisibleForTesting
117 final ConcurrentMap<BlockCacheKey, RAMQueueEntry> ramCache;
118
119 @VisibleForTesting
120 ConcurrentMap<BlockCacheKey, BucketEntry> backingMap;
121
122
123
124
125
126
127 private volatile boolean cacheEnabled;
128
129
130
131
132
133
134
135
136 @VisibleForTesting
137 final ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues =
138 new ArrayList<BlockingQueue<RAMQueueEntry>>();
139 @VisibleForTesting
140 final WriterThread[] writerThreads;
141
142
143 private volatile boolean freeInProgress = false;
144 private final Lock freeSpaceLock = new ReentrantLock();
145
146 private UniqueIndexMap<Integer> deserialiserMap = new UniqueIndexMap<Integer>();
147
148 private final AtomicLong realCacheSize = new AtomicLong(0);
149 private final AtomicLong heapSize = new AtomicLong(0);
150
151 private final AtomicLong blockNumber = new AtomicLong(0);
152 private final AtomicLong failedBlockAdditions = new AtomicLong(0);
153
154
155 private final AtomicLong accessCount = new AtomicLong(0);
156
157 private static final int DEFAULT_CACHE_WAIT_TIME = 50;
158
159
160
161 boolean wait_when_cache = false;
162
163 private final BucketCacheStats cacheStats = new BucketCacheStats();
164
165 private final String persistencePath;
166 private final long cacheCapacity;
167
168 private final long blockSize;
169
170
171 private final int ioErrorsTolerationDuration;
172
173 public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000;
174
175
176
177 private volatile long ioErrorStartTime = -1;
178
179
180
181
182
183
184
185
186 @VisibleForTesting
187 final IdLock offsetLock = new IdLock();
188
189 private final ConcurrentIndex<String, BlockCacheKey> blocksByHFile =
190 new ConcurrentIndex<String, BlockCacheKey>(new Comparator<BlockCacheKey>() {
191 @Override
192 public int compare(BlockCacheKey a, BlockCacheKey b) {
193 if (a.getOffset() == b.getOffset()) {
194 return 0;
195 } else if (a.getOffset() < b.getOffset()) {
196 return -1;
197 }
198 return 1;
199 }
200 });
201
202
203 private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1,
204 new ThreadFactoryBuilder().setNameFormat("BucketCacheStatsExecutor").setDaemon(true).build());
205
206
207 private BucketAllocator bucketAllocator;
208
209 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
210 int writerThreadNum, int writerQLen, String persistencePath) throws FileNotFoundException,
211 IOException {
212 this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
213 persistencePath, DEFAULT_ERROR_TOLERATION_DURATION);
214 }
215
216 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
217 int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration)
218 throws FileNotFoundException, IOException {
219 this.ioEngine = getIOEngineFromName(ioEngineName, capacity);
220 this.writerThreads = new WriterThread[writerThreadNum];
221 long blockNumCapacity = capacity / blockSize;
222 if (blockNumCapacity >= Integer.MAX_VALUE) {
223
224 throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now");
225 }
226
227 this.cacheCapacity = capacity;
228 this.persistencePath = persistencePath;
229 this.blockSize = blockSize;
230 this.ioErrorsTolerationDuration = ioErrorsTolerationDuration;
231
232 bucketAllocator = new BucketAllocator(capacity, bucketSizes);
233 for (int i = 0; i < writerThreads.length; ++i) {
234 writerQueues.add(new ArrayBlockingQueue<RAMQueueEntry>(writerQLen));
235 }
236
237 assert writerQueues.size() == writerThreads.length;
238 this.ramCache = new ConcurrentHashMap<BlockCacheKey, RAMQueueEntry>();
239
240 this.backingMap = new ConcurrentHashMap<BlockCacheKey, BucketEntry>((int) blockNumCapacity);
241
242 if (ioEngine.isPersistent() && persistencePath != null) {
243 try {
244 retrieveFromFile(bucketSizes);
245 } catch (IOException ioex) {
246 LOG.error("Can't restore from file because of", ioex);
247 } catch (ClassNotFoundException cnfe) {
248 LOG.error("Can't restore from file in rebuild because can't deserialise",cnfe);
249 throw new RuntimeException(cnfe);
250 }
251 }
252 final String threadName = Thread.currentThread().getName();
253 this.cacheEnabled = true;
254 for (int i = 0; i < writerThreads.length; ++i) {
255 writerThreads[i] = new WriterThread(writerQueues.get(i));
256 writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i);
257 writerThreads[i].setDaemon(true);
258 }
259 startWriterThreads();
260
261
262
263
264 this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
265 statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
266 LOG.info("Started bucket cache; ioengine=" + ioEngineName +
267 ", capacity=" + StringUtils.byteDesc(capacity) +
268 ", blockSize=" + StringUtils.byteDesc(blockSize) + ", writerThreadNum=" +
269 writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath=" +
270 persistencePath + ", bucketAllocator=" + this.bucketAllocator.getClass().getName());
271 }
272
273
274
275
276
277 @VisibleForTesting
278 protected void startWriterThreads() {
279 for (WriterThread thread : writerThreads) {
280 thread.start();
281 }
282 }
283
284 @VisibleForTesting
285 boolean isCacheEnabled() {
286 return this.cacheEnabled;
287 }
288
289 public long getMaxSize() {
290 return this.cacheCapacity;
291 }
292
293 public String getIoEngine() {
294 return ioEngine.toString();
295 }
296
297
298
299
300
301
302
303
304 private IOEngine getIOEngineFromName(String ioEngineName, long capacity)
305 throws IOException {
306 if (ioEngineName.startsWith("file:"))
307 return new FileIOEngine(ioEngineName.substring(5), capacity);
308 else if (ioEngineName.startsWith("offheap"))
309 return new ByteBufferIOEngine(capacity, true);
310 else if (ioEngineName.startsWith("heap"))
311 return new ByteBufferIOEngine(capacity, false);
312 else
313 throw new IllegalArgumentException(
314 "Don't understand io engine name for cache - prefix with file:, heap or offheap");
315 }
316
317
318
319
320
321
322 @Override
323 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
324 cacheBlock(cacheKey, buf, false, false);
325 }
326
327
328
329
330
331
332
333
334 @Override
335 public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
336 final boolean cacheDataInL1) {
337 cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache);
338 }
339
340
341
342
343
344
345
346
347 public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
348 boolean wait) {
349 if (!cacheEnabled) {
350 return;
351 }
352
353 if (backingMap.containsKey(cacheKey)) {
354 return;
355 }
356
357
358
359
360 RAMQueueEntry re =
361 new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory);
362 if (ramCache.putIfAbsent(cacheKey, re) != null) {
363 return;
364 }
365 int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size();
366 BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum);
367 boolean successfulAddition = false;
368 if (wait) {
369 try {
370 successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS);
371 } catch (InterruptedException e) {
372 Thread.currentThread().interrupt();
373 }
374 } else {
375 successfulAddition = bq.offer(re);
376 }
377 if (!successfulAddition) {
378 ramCache.remove(cacheKey);
379 failedBlockAdditions.incrementAndGet();
380 } else {
381 this.blockNumber.incrementAndGet();
382 this.heapSize.addAndGet(cachedItem.heapSize());
383 blocksByHFile.put(cacheKey.getHfileName(), cacheKey);
384 }
385 }
386
387
388
389
390
391
392
393
394
395 @Override
396 public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
397 boolean updateCacheMetrics) {
398 if (!cacheEnabled) {
399 return null;
400 }
401 RAMQueueEntry re = ramCache.get(key);
402 if (re != null) {
403 if (updateCacheMetrics) {
404 cacheStats.hit(caching);
405 }
406 re.access(accessCount.incrementAndGet());
407 return re.getData();
408 }
409 BucketEntry bucketEntry = backingMap.get(key);
410 if (bucketEntry != null) {
411 long start = System.nanoTime();
412 IdLock.Entry lockEntry = null;
413 try {
414 lockEntry = offsetLock.getLockEntry(bucketEntry.offset());
415
416
417
418 if (bucketEntry.equals(backingMap.get(key))) {
419 int len = bucketEntry.getLength();
420 ByteBuffer bb = ByteBuffer.allocate(len);
421 int lenRead = ioEngine.read(bb, bucketEntry.offset());
422 if (lenRead != len) {
423 throw new RuntimeException("Only " + lenRead + " bytes read, " + len + " expected");
424 }
425 CacheableDeserializer<Cacheable> deserializer =
426 bucketEntry.deserializerReference(this.deserialiserMap);
427 Cacheable cachedBlock = deserializer.deserialize(bb, true);
428 long timeTaken = System.nanoTime() - start;
429 if (updateCacheMetrics) {
430 cacheStats.hit(caching);
431 cacheStats.ioHit(timeTaken);
432 }
433 bucketEntry.access(accessCount.incrementAndGet());
434 if (this.ioErrorStartTime > 0) {
435 ioErrorStartTime = -1;
436 }
437 return cachedBlock;
438 }
439 } catch (IOException ioex) {
440 LOG.error("Failed reading block " + key + " from bucket cache", ioex);
441 checkIOErrorIsTolerated();
442 } finally {
443 if (lockEntry != null) {
444 offsetLock.releaseLockEntry(lockEntry);
445 }
446 }
447 }
448 if (!repeat && updateCacheMetrics) {
449 cacheStats.miss(caching);
450 }
451 return null;
452 }
453
454 @VisibleForTesting
455 void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber) {
456 bucketAllocator.freeBlock(bucketEntry.offset());
457 realCacheSize.addAndGet(-1 * bucketEntry.getLength());
458 blocksByHFile.remove(cacheKey.getHfileName(), cacheKey);
459 if (decrementBlockNumber) {
460 this.blockNumber.decrementAndGet();
461 }
462 }
463
464 @Override
465 public boolean evictBlock(BlockCacheKey cacheKey) {
466 if (!cacheEnabled) {
467 return false;
468 }
469 RAMQueueEntry removedBlock = ramCache.remove(cacheKey);
470 if (removedBlock != null) {
471 this.blockNumber.decrementAndGet();
472 this.heapSize.addAndGet(-1 * removedBlock.getData().heapSize());
473 }
474 BucketEntry bucketEntry = backingMap.get(cacheKey);
475 if (bucketEntry == null) {
476 if (removedBlock != null) {
477 cacheStats.evicted(0);
478 return true;
479 } else {
480 return false;
481 }
482 }
483 IdLock.Entry lockEntry = null;
484 try {
485 lockEntry = offsetLock.getLockEntry(bucketEntry.offset());
486 if (backingMap.remove(cacheKey, bucketEntry)) {
487 blockEvicted(cacheKey, bucketEntry, removedBlock == null);
488 } else {
489 return false;
490 }
491 } catch (IOException ie) {
492 LOG.warn("Failed evicting block " + cacheKey);
493 return false;
494 } finally {
495 if (lockEntry != null) {
496 offsetLock.releaseLockEntry(lockEntry);
497 }
498 }
499 cacheStats.evicted(bucketEntry.getCachedTime());
500 return true;
501 }
502
503
504
505
506 private static class StatisticsThread extends Thread {
507 private final BucketCache bucketCache;
508
509 public StatisticsThread(BucketCache bucketCache) {
510 super("BucketCacheStatsThread");
511 setDaemon(true);
512 this.bucketCache = bucketCache;
513 }
514
515 @Override
516 public void run() {
517 bucketCache.logStats();
518 }
519 }
520
521 public void logStats() {
522 long totalSize = bucketAllocator.getTotalSize();
523 long usedSize = bucketAllocator.getUsedSize();
524 long freeSize = totalSize - usedSize;
525 long cacheSize = getRealCacheSize();
526 LOG.info("failedBlockAdditions=" + getFailedBlockAdditions() + ", " +
527 "totalSize=" + StringUtils.byteDesc(totalSize) + ", " +
528 "freeSize=" + StringUtils.byteDesc(freeSize) + ", " +
529 "usedSize=" + StringUtils.byteDesc(usedSize) +", " +
530 "cacheSize=" + StringUtils.byteDesc(cacheSize) +", " +
531 "accesses=" + cacheStats.getRequestCount() + ", " +
532 "hits=" + cacheStats.getHitCount() + ", " +
533 "IOhitsPerSecond=" + cacheStats.getIOHitsPerSecond() + ", " +
534 "IOTimePerHit=" + String.format("%.2f", cacheStats.getIOTimePerHit())+ ", " +
535 "hitRatio=" + (cacheStats.getHitCount() == 0 ? "0," :
536 (StringUtils.formatPercent(cacheStats.getHitRatio(), 2)+ ", ")) +
537 "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " +
538 "cachingHits=" + cacheStats.getHitCachingCount() + ", " +
539 "cachingHitsRatio=" +(cacheStats.getHitCachingCount() == 0 ? "0," :
540 (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2)+ ", ")) +
541 "evictions=" + cacheStats.getEvictionCount() + ", " +
542 "evicted=" + cacheStats.getEvictedCount() + ", " +
543 "evictedPerRun=" + cacheStats.evictedPerEviction());
544 cacheStats.reset();
545 }
546
547 public long getFailedBlockAdditions() {
548 return this.failedBlockAdditions.get();
549 }
550
551 public long getRealCacheSize() {
552 return this.realCacheSize.get();
553 }
554
555 private long acceptableSize() {
556 return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_ACCEPT_FACTOR);
557 }
558
559 private long singleSize() {
560 return (long) Math.floor(bucketAllocator.getTotalSize()
561 * DEFAULT_SINGLE_FACTOR * DEFAULT_MIN_FACTOR);
562 }
563
564 private long multiSize() {
565 return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_MULTI_FACTOR
566 * DEFAULT_MIN_FACTOR);
567 }
568
569 private long memorySize() {
570 return (long) Math.floor(bucketAllocator.getTotalSize() * DEFAULT_MEMORY_FACTOR
571 * DEFAULT_MIN_FACTOR);
572 }
573
574
575
576
577
578
579
580 private void freeSpace(final String why) {
581
582 if (!freeSpaceLock.tryLock()) return;
583 try {
584 freeInProgress = true;
585 long bytesToFreeWithoutExtra = 0;
586
587 StringBuffer msgBuffer = LOG.isDebugEnabled()? new StringBuffer(): null;
588 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics();
589 long[] bytesToFreeForBucket = new long[stats.length];
590 for (int i = 0; i < stats.length; i++) {
591 bytesToFreeForBucket[i] = 0;
592 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - DEFAULT_MIN_FACTOR));
593 freeGoal = Math.max(freeGoal, 1);
594 if (stats[i].freeCount() < freeGoal) {
595 bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount());
596 bytesToFreeWithoutExtra += bytesToFreeForBucket[i];
597 if (msgBuffer != null) {
598 msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")="
599 + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", ");
600 }
601 }
602 }
603 if (msgBuffer != null) {
604 msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", ");
605 }
606
607 if (bytesToFreeWithoutExtra <= 0) {
608 return;
609 }
610 long currentSize = bucketAllocator.getUsedSize();
611 long totalSize=bucketAllocator.getTotalSize();
612 if (LOG.isDebugEnabled() && msgBuffer != null) {
613 LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString() +
614 " of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" +
615 StringUtils.byteDesc(realCacheSize.get()) + ", total=" + StringUtils.byteDesc(totalSize));
616 }
617
618 long bytesToFreeWithExtra = (long) Math.floor(bytesToFreeWithoutExtra
619 * (1 + DEFAULT_EXTRA_FREE_FACTOR));
620
621
622 BucketEntryGroup bucketSingle = new BucketEntryGroup(bytesToFreeWithExtra,
623 blockSize, singleSize());
624 BucketEntryGroup bucketMulti = new BucketEntryGroup(bytesToFreeWithExtra,
625 blockSize, multiSize());
626 BucketEntryGroup bucketMemory = new BucketEntryGroup(bytesToFreeWithExtra,
627 blockSize, memorySize());
628
629
630
631 for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) {
632 switch (bucketEntryWithKey.getValue().getPriority()) {
633 case SINGLE: {
634 bucketSingle.add(bucketEntryWithKey);
635 break;
636 }
637 case MULTI: {
638 bucketMulti.add(bucketEntryWithKey);
639 break;
640 }
641 case MEMORY: {
642 bucketMemory.add(bucketEntryWithKey);
643 break;
644 }
645 }
646 }
647
648 PriorityQueue<BucketEntryGroup> bucketQueue = new PriorityQueue<BucketEntryGroup>(3);
649
650 bucketQueue.add(bucketSingle);
651 bucketQueue.add(bucketMulti);
652 bucketQueue.add(bucketMemory);
653
654 int remainingBuckets = 3;
655 long bytesFreed = 0;
656
657 BucketEntryGroup bucketGroup;
658 while ((bucketGroup = bucketQueue.poll()) != null) {
659 long overflow = bucketGroup.overflow();
660 if (overflow > 0) {
661 long bucketBytesToFree = Math.min(overflow,
662 (bytesToFreeWithoutExtra - bytesFreed) / remainingBuckets);
663 bytesFreed += bucketGroup.free(bucketBytesToFree);
664 }
665 remainingBuckets--;
666 }
667
668
669
670
671
672 stats = bucketAllocator.getIndexStatistics();
673 boolean needFreeForExtra = false;
674 for (int i = 0; i < stats.length; i++) {
675 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - DEFAULT_MIN_FACTOR));
676 freeGoal = Math.max(freeGoal, 1);
677 if (stats[i].freeCount() < freeGoal) {
678 needFreeForExtra = true;
679 break;
680 }
681 }
682
683 if (needFreeForExtra) {
684 bucketQueue.clear();
685 remainingBuckets = 2;
686
687 bucketQueue.add(bucketSingle);
688 bucketQueue.add(bucketMulti);
689
690 while ((bucketGroup = bucketQueue.poll()) != null) {
691 long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets;
692 bytesFreed += bucketGroup.free(bucketBytesToFree);
693 remainingBuckets--;
694 }
695 }
696
697 if (LOG.isDebugEnabled()) {
698 long single = bucketSingle.totalSize();
699 long multi = bucketMulti.totalSize();
700 long memory = bucketMemory.totalSize();
701 if (LOG.isDebugEnabled()) {
702 LOG.debug("Bucket cache free space completed; " + "freed="
703 + StringUtils.byteDesc(bytesFreed) + ", " + "total="
704 + StringUtils.byteDesc(totalSize) + ", " + "single="
705 + StringUtils.byteDesc(single) + ", " + "multi="
706 + StringUtils.byteDesc(multi) + ", " + "memory="
707 + StringUtils.byteDesc(memory));
708 }
709 }
710
711 } finally {
712 cacheStats.evict();
713 freeInProgress = false;
714 freeSpaceLock.unlock();
715 }
716 }
717
718
719 @VisibleForTesting
720 class WriterThread extends HasThread {
721 private final BlockingQueue<RAMQueueEntry> inputQueue;
722 private volatile boolean writerEnabled = true;
723
724 WriterThread(BlockingQueue<RAMQueueEntry> queue) {
725 this.inputQueue = queue;
726 }
727
728
729 @VisibleForTesting
730 void disableWriter() {
731 this.writerEnabled = false;
732 }
733
734 public void run() {
735 List<RAMQueueEntry> entries = new ArrayList<RAMQueueEntry>();
736 try {
737 while (cacheEnabled && writerEnabled) {
738 try {
739 try {
740
741 entries = getRAMQueueEntries(inputQueue, entries);
742 } catch (InterruptedException ie) {
743 if (!cacheEnabled) break;
744 }
745 doDrain(entries);
746 } catch (Exception ioe) {
747 LOG.error("WriterThread encountered error", ioe);
748 }
749 }
750 } catch (Throwable t) {
751 LOG.warn("Failed doing drain", t);
752 }
753 LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled);
754 }
755
756
757
758
759
760
761
762
763
764 @VisibleForTesting
765 void doDrain(final List<RAMQueueEntry> entries) throws InterruptedException {
766 if (entries.isEmpty()) {
767 return;
768 }
769
770
771
772
773
774
775 final int size = entries.size();
776 BucketEntry[] bucketEntries = new BucketEntry[size];
777
778
779 int index = 0;
780 while (cacheEnabled && index < size) {
781 RAMQueueEntry re = null;
782 try {
783 re = entries.get(index);
784 if (re == null) {
785 LOG.warn("Couldn't get entry or changed on us; who else is messing with it?");
786 index++;
787 continue;
788 }
789 BucketEntry bucketEntry =
790 re.writeToCache(ioEngine, bucketAllocator, deserialiserMap, realCacheSize);
791
792 bucketEntries[index] = bucketEntry;
793 if (ioErrorStartTime > 0) {
794 ioErrorStartTime = -1;
795 }
796 index++;
797 } catch (BucketAllocatorException fle) {
798 LOG.warn("Failed allocation for " + (re == null ? "" : re.getKey()) + "; " + fle);
799
800 bucketEntries[index] = null;
801 index++;
802 } catch (CacheFullException cfe) {
803
804 if (!freeInProgress) {
805 freeSpace("Full!");
806 } else {
807 Thread.sleep(50);
808 }
809 } catch (IOException ioex) {
810
811 LOG.error("Failed writing to bucket cache", ioex);
812 checkIOErrorIsTolerated();
813 }
814 }
815
816
817 try {
818 ioEngine.sync();
819 } catch (IOException ioex) {
820 LOG.error("Failed syncing IO engine", ioex);
821 checkIOErrorIsTolerated();
822
823 for (int i = 0; i < entries.size(); ++i) {
824 if (bucketEntries[i] != null) {
825 bucketAllocator.freeBlock(bucketEntries[i].offset());
826 bucketEntries[i] = null;
827 }
828 }
829 }
830
831
832
833 for (int i = 0; i < size; ++i) {
834 BlockCacheKey key = entries.get(i).getKey();
835
836 if (bucketEntries[i] != null) {
837 backingMap.put(key, bucketEntries[i]);
838 }
839
840 RAMQueueEntry ramCacheEntry = ramCache.remove(key);
841 if (ramCacheEntry != null) {
842 heapSize.addAndGet(-1 * entries.get(i).getData().heapSize());
843 } else if (bucketEntries[i] != null){
844
845 IdLock.Entry lockEntry = null;
846 try {
847 lockEntry = offsetLock.getLockEntry(bucketEntries[i].offset());
848 if (backingMap.remove(key, bucketEntries[i])) {
849 blockEvicted(key, bucketEntries[i], false);
850 }
851 } catch (IOException e) {
852 LOG.warn("failed to free space for " + key, e);
853 } finally {
854 if (lockEntry != null) {
855 offsetLock.releaseLockEntry(lockEntry);
856 }
857 }
858 }
859 }
860
861 long used = bucketAllocator.getUsedSize();
862 if (used > acceptableSize()) {
863 freeSpace("Used=" + used + " > acceptable=" + acceptableSize());
864 }
865 return;
866 }
867 }
868
869
870
871
872
873
874
875
876
877 @VisibleForTesting
878 static List<RAMQueueEntry> getRAMQueueEntries(final BlockingQueue<RAMQueueEntry> q,
879 final List<RAMQueueEntry> receptical)
880 throws InterruptedException {
881
882
883 receptical.clear();
884 receptical.add(q.take());
885 q.drainTo(receptical);
886 return receptical;
887 }
888
889 private void persistToFile() throws IOException {
890 assert !cacheEnabled;
891 FileOutputStream fos = null;
892 ObjectOutputStream oos = null;
893 try {
894 if (!ioEngine.isPersistent())
895 throw new IOException(
896 "Attempt to persist non-persistent cache mappings!");
897 fos = new FileOutputStream(persistencePath, false);
898 oos = new ObjectOutputStream(fos);
899 oos.writeLong(cacheCapacity);
900 oos.writeUTF(ioEngine.getClass().getName());
901 oos.writeUTF(backingMap.getClass().getName());
902 oos.writeObject(deserialiserMap);
903 oos.writeObject(backingMap);
904 } finally {
905 if (oos != null) oos.close();
906 if (fos != null) fos.close();
907 }
908 }
909
910 @SuppressWarnings("unchecked")
911 private void retrieveFromFile(int[] bucketSizes) throws IOException, BucketAllocatorException,
912 ClassNotFoundException {
913 File persistenceFile = new File(persistencePath);
914 if (!persistenceFile.exists()) {
915 return;
916 }
917 assert !cacheEnabled;
918 FileInputStream fis = null;
919 ObjectInputStream ois = null;
920 try {
921 if (!ioEngine.isPersistent())
922 throw new IOException(
923 "Attempt to restore non-persistent cache mappings!");
924 fis = new FileInputStream(persistencePath);
925 ois = new ObjectInputStream(fis);
926 long capacitySize = ois.readLong();
927 if (capacitySize != cacheCapacity)
928 throw new IOException("Mismatched cache capacity:"
929 + StringUtils.byteDesc(capacitySize) + ", expected: "
930 + StringUtils.byteDesc(cacheCapacity));
931 String ioclass = ois.readUTF();
932 String mapclass = ois.readUTF();
933 if (!ioEngine.getClass().getName().equals(ioclass))
934 throw new IOException("Class name for IO engine mismatch: " + ioclass
935 + ", expected:" + ioEngine.getClass().getName());
936 if (!backingMap.getClass().getName().equals(mapclass))
937 throw new IOException("Class name for cache map mismatch: " + mapclass
938 + ", expected:" + backingMap.getClass().getName());
939 UniqueIndexMap<Integer> deserMap = (UniqueIndexMap<Integer>) ois
940 .readObject();
941 ConcurrentHashMap<BlockCacheKey, BucketEntry> backingMapFromFile =
942 (ConcurrentHashMap<BlockCacheKey, BucketEntry>) ois.readObject();
943 BucketAllocator allocator = new BucketAllocator(cacheCapacity, bucketSizes,
944 backingMapFromFile, realCacheSize);
945 bucketAllocator = allocator;
946 deserialiserMap = deserMap;
947 backingMap = backingMapFromFile;
948 } finally {
949 if (ois != null) ois.close();
950 if (fis != null) fis.close();
951 if (!persistenceFile.delete()) {
952 throw new IOException("Failed deleting persistence file "
953 + persistenceFile.getAbsolutePath());
954 }
955 }
956 }
957
958
959
960
961
962
963 private void checkIOErrorIsTolerated() {
964 long now = EnvironmentEdgeManager.currentTime();
965 if (this.ioErrorStartTime > 0) {
966 if (cacheEnabled && (now - ioErrorStartTime) > this.ioErrorsTolerationDuration) {
967 LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration +
968 "ms, disabing cache, please check your IOEngine");
969 disableCache();
970 }
971 } else {
972 this.ioErrorStartTime = now;
973 }
974 }
975
976
977
978
979
980 private void disableCache() {
981 if (!cacheEnabled)
982 return;
983 cacheEnabled = false;
984 ioEngine.shutdown();
985 this.scheduleThreadPool.shutdown();
986 for (int i = 0; i < writerThreads.length; ++i)
987 writerThreads[i].interrupt();
988 this.ramCache.clear();
989 if (!ioEngine.isPersistent() || persistencePath == null) {
990 this.backingMap.clear();
991 }
992 }
993
994 private void join() throws InterruptedException {
995 for (int i = 0; i < writerThreads.length; ++i)
996 writerThreads[i].join();
997 }
998
999 @Override
1000 public void shutdown() {
1001 disableCache();
1002 LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent()
1003 + "; path to write=" + persistencePath);
1004 if (ioEngine.isPersistent() && persistencePath != null) {
1005 try {
1006 join();
1007 persistToFile();
1008 } catch (IOException ex) {
1009 LOG.error("Unable to persist data on exit: " + ex.toString(), ex);
1010 } catch (InterruptedException e) {
1011 LOG.warn("Failed to persist data on exit", e);
1012 }
1013 }
1014 }
1015
1016 @Override
1017 public CacheStats getStats() {
1018 return cacheStats;
1019 }
1020
1021 public BucketAllocator getAllocator() {
1022 return this.bucketAllocator;
1023 }
1024
1025 @Override
1026 public long heapSize() {
1027 return this.heapSize.get();
1028 }
1029
1030 @Override
1031 public long size() {
1032 return this.realCacheSize.get();
1033 }
1034
1035 @Override
1036 public long getFreeSize() {
1037 return this.bucketAllocator.getFreeSize();
1038 }
1039
1040 @Override
1041 public long getBlockCount() {
1042 return this.blockNumber.get();
1043 }
1044
1045 @Override
1046 public long getCurrentSize() {
1047 return this.bucketAllocator.getUsedSize();
1048 }
1049
1050
1051
1052
1053
1054
1055
1056
1057 @Override
1058 public int evictBlocksByHfileName(String hfileName) {
1059
1060
1061 Set<BlockCacheKey> keySet = blocksByHFile.values(hfileName);
1062 if (keySet == null) {
1063 return 0;
1064 }
1065 int numEvicted = 0;
1066 List<BlockCacheKey> keysForHFile = ImmutableList.copyOf(keySet);
1067 for (BlockCacheKey key : keysForHFile) {
1068 if (evictBlock(key)) {
1069 ++numEvicted;
1070 }
1071 }
1072
1073 return numEvicted;
1074 }
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084 static class BucketEntry implements Serializable {
1085 private static final long serialVersionUID = -6741504807982257534L;
1086
1087
1088 static final Comparator<BucketEntry> COMPARATOR = new Comparator<BucketCache.BucketEntry>() {
1089
1090 @Override
1091 public int compare(BucketEntry o1, BucketEntry o2) {
1092 return Long.compare(o2.accessCounter, o1.accessCounter);
1093 }
1094 };
1095
1096 private int offsetBase;
1097 private int length;
1098 private byte offset1;
1099 byte deserialiserIndex;
1100 private volatile long accessCounter;
1101 private BlockPriority priority;
1102
1103
1104
1105 private final long cachedTime = System.nanoTime();
1106
1107 BucketEntry(long offset, int length, long accessCounter, boolean inMemory) {
1108 setOffset(offset);
1109 this.length = length;
1110 this.accessCounter = accessCounter;
1111 if (inMemory) {
1112 this.priority = BlockPriority.MEMORY;
1113 } else {
1114 this.priority = BlockPriority.SINGLE;
1115 }
1116 }
1117
1118 long offset() {
1119 long o = ((long) offsetBase) & 0xFFFFFFFF;
1120 o += (((long) (offset1)) & 0xFF) << 32;
1121 return o << 8;
1122 }
1123
1124 private void setOffset(long value) {
1125 assert (value & 0xFF) == 0;
1126 value >>= 8;
1127 offsetBase = (int) value;
1128 offset1 = (byte) (value >> 32);
1129 }
1130
1131 public int getLength() {
1132 return length;
1133 }
1134
1135 protected CacheableDeserializer<Cacheable> deserializerReference(
1136 UniqueIndexMap<Integer> deserialiserMap) {
1137 return CacheableDeserializerIdManager.getDeserializer(deserialiserMap
1138 .unmap(deserialiserIndex));
1139 }
1140
1141 protected void setDeserialiserReference(
1142 CacheableDeserializer<Cacheable> deserializer,
1143 UniqueIndexMap<Integer> deserialiserMap) {
1144 this.deserialiserIndex = ((byte) deserialiserMap.map(deserializer
1145 .getDeserialiserIdentifier()));
1146 }
1147
1148
1149
1150
1151 public void access(long accessCounter) {
1152 this.accessCounter = accessCounter;
1153 if (this.priority == BlockPriority.SINGLE) {
1154 this.priority = BlockPriority.MULTI;
1155 }
1156 }
1157
1158 public BlockPriority getPriority() {
1159 return this.priority;
1160 }
1161
1162 public long getCachedTime() {
1163 return cachedTime;
1164 }
1165 }
1166
1167
1168
1169
1170
1171
1172
1173 private class BucketEntryGroup implements Comparable<BucketEntryGroup> {
1174
1175 private CachedEntryQueue queue;
1176 private long totalSize = 0;
1177 private long bucketSize;
1178
1179 public BucketEntryGroup(long bytesToFree, long blockSize, long bucketSize) {
1180 this.bucketSize = bucketSize;
1181 queue = new CachedEntryQueue(bytesToFree, blockSize);
1182 totalSize = 0;
1183 }
1184
1185 public void add(Map.Entry<BlockCacheKey, BucketEntry> block) {
1186 totalSize += block.getValue().getLength();
1187 queue.add(block);
1188 }
1189
1190 public long free(long toFree) {
1191 Map.Entry<BlockCacheKey, BucketEntry> entry;
1192 long freedBytes = 0;
1193 while ((entry = queue.pollLast()) != null) {
1194 evictBlock(entry.getKey());
1195 freedBytes += entry.getValue().getLength();
1196 if (freedBytes >= toFree) {
1197 return freedBytes;
1198 }
1199 }
1200 return freedBytes;
1201 }
1202
1203 public long overflow() {
1204 return totalSize - bucketSize;
1205 }
1206
1207 public long totalSize() {
1208 return totalSize;
1209 }
1210
1211 @Override
1212 public int compareTo(BucketEntryGroup that) {
1213 return Long.compare(this.overflow(), that.overflow());
1214 }
1215
1216 @Override
1217 public boolean equals(Object that) {
1218 return this == that;
1219 }
1220
1221 }
1222
1223
1224
1225
1226 @VisibleForTesting
1227 static class RAMQueueEntry {
1228 private BlockCacheKey key;
1229 private Cacheable data;
1230 private long accessCounter;
1231 private boolean inMemory;
1232
1233 public RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter,
1234 boolean inMemory) {
1235 this.key = bck;
1236 this.data = data;
1237 this.accessCounter = accessCounter;
1238 this.inMemory = inMemory;
1239 }
1240
1241 public Cacheable getData() {
1242 return data;
1243 }
1244
1245 public BlockCacheKey getKey() {
1246 return key;
1247 }
1248
1249 public void access(long accessCounter) {
1250 this.accessCounter = accessCounter;
1251 }
1252
1253 public BucketEntry writeToCache(final IOEngine ioEngine,
1254 final BucketAllocator bucketAllocator,
1255 final UniqueIndexMap<Integer> deserialiserMap,
1256 final AtomicLong realCacheSize) throws CacheFullException, IOException,
1257 BucketAllocatorException {
1258 int len = data.getSerializedLength();
1259
1260 if (len == 0) return null;
1261 long offset = bucketAllocator.allocateBlock(len);
1262 BucketEntry bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory);
1263 bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap);
1264 try {
1265 if (data instanceof HFileBlock) {
1266 HFileBlock block = (HFileBlock) data;
1267 ByteBuffer sliceBuf = block.getBufferReadOnlyWithHeader();
1268 sliceBuf.rewind();
1269 assert len == sliceBuf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE ||
1270 len == sliceBuf.limit() + block.headerSize() + HFileBlock.EXTRA_SERIALIZATION_SPACE;
1271 ByteBuffer extraInfoBuffer = ByteBuffer.allocate(HFileBlock.EXTRA_SERIALIZATION_SPACE);
1272 block.serializeExtraInfo(extraInfoBuffer);
1273 ioEngine.write(sliceBuf, offset);
1274 ioEngine.write(extraInfoBuffer, offset + len - HFileBlock.EXTRA_SERIALIZATION_SPACE);
1275 } else {
1276 ByteBuffer bb = ByteBuffer.allocate(len);
1277 data.serialize(bb);
1278 ioEngine.write(bb, offset);
1279 }
1280 } catch (IOException ioe) {
1281
1282 bucketAllocator.freeBlock(offset);
1283 throw ioe;
1284 }
1285
1286 realCacheSize.addAndGet(len);
1287 return bucketEntry;
1288 }
1289 }
1290
1291
1292
1293
1294
1295 void stopWriterThreads() throws InterruptedException {
1296 for (WriterThread writerThread : writerThreads) {
1297 writerThread.disableWriter();
1298 writerThread.interrupt();
1299 writerThread.join();
1300 }
1301 }
1302
1303 @Override
1304 public Iterator<CachedBlock> iterator() {
1305
1306 final Iterator<Map.Entry<BlockCacheKey, BucketEntry>> i =
1307 this.backingMap.entrySet().iterator();
1308 return new Iterator<CachedBlock>() {
1309 private final long now = System.nanoTime();
1310
1311 @Override
1312 public boolean hasNext() {
1313 return i.hasNext();
1314 }
1315
1316 @Override
1317 public CachedBlock next() {
1318 final Map.Entry<BlockCacheKey, BucketEntry> e = i.next();
1319 return new CachedBlock() {
1320 @Override
1321 public String toString() {
1322 return BlockCacheUtil.toString(this, now);
1323 }
1324
1325 @Override
1326 public BlockPriority getBlockPriority() {
1327 return e.getValue().getPriority();
1328 }
1329
1330 @Override
1331 public BlockType getBlockType() {
1332
1333 return null;
1334 }
1335
1336 @Override
1337 public long getOffset() {
1338 return e.getKey().getOffset();
1339 }
1340
1341 @Override
1342 public long getSize() {
1343 return e.getValue().getLength();
1344 }
1345
1346 @Override
1347 public long getCachedTime() {
1348 return e.getValue().getCachedTime();
1349 }
1350
1351 @Override
1352 public String getFilename() {
1353 return e.getKey().getHfileName();
1354 }
1355
1356 @Override
1357 public int compareTo(CachedBlock other) {
1358 int diff = this.getFilename().compareTo(other.getFilename());
1359 if (diff != 0) return diff;
1360
1361 diff = Long.compare(this.getOffset(), other.getOffset());
1362 if (diff != 0) return diff;
1363 if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
1364 throw new IllegalStateException("" + this.getCachedTime() + ", " +
1365 other.getCachedTime());
1366 }
1367 return Long.compare(other.getCachedTime(), this.getCachedTime());
1368 }
1369
1370 @Override
1371 public int hashCode() {
1372 return e.getKey().hashCode();
1373 }
1374
1375 @Override
1376 public boolean equals(Object obj) {
1377 if (obj instanceof CachedBlock) {
1378 CachedBlock cb = (CachedBlock)obj;
1379 return compareTo(cb) == 0;
1380 } else {
1381 return false;
1382 }
1383 }
1384 };
1385 }
1386
1387 @Override
1388 public void remove() {
1389 throw new UnsupportedOperationException();
1390 }
1391 };
1392 }
1393
1394 @Override
1395 public BlockCache[] getBlockCaches() {
1396 return null;
1397 }
1398 }