001/** 002 * Copyright The Apache Software Foundation 003 * 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021package org.apache.hadoop.hbase.io.hfile.bucket; 022 023import java.io.File; 024import java.io.FileInputStream; 025import java.io.FileNotFoundException; 026import java.io.FileOutputStream; 027import java.io.IOException; 028import java.io.ObjectInputStream; 029import java.io.ObjectOutputStream; 030import java.io.Serializable; 031import java.nio.ByteBuffer; 032import java.util.ArrayList; 033import java.util.Comparator; 034import java.util.HashSet; 035import java.util.Iterator; 036import java.util.List; 037import java.util.Map; 038import java.util.NavigableSet; 039import java.util.PriorityQueue; 040import java.util.Set; 041import java.util.concurrent.ArrayBlockingQueue; 042import java.util.concurrent.BlockingQueue; 043import java.util.concurrent.ConcurrentHashMap; 044import java.util.concurrent.ConcurrentMap; 045import java.util.concurrent.ConcurrentSkipListSet; 046import java.util.concurrent.Executors; 047import java.util.concurrent.ScheduledExecutorService; 048import java.util.concurrent.TimeUnit; 049import java.util.concurrent.atomic.AtomicInteger; 050import java.util.concurrent.atomic.AtomicLong; 051import java.util.concurrent.atomic.LongAdder; 052import java.util.concurrent.locks.Lock; 053import java.util.concurrent.locks.ReentrantLock; 054import java.util.concurrent.locks.ReentrantReadWriteLock; 055import org.apache.hadoop.conf.Configuration; 056import org.apache.hadoop.hbase.HBaseConfiguration; 057import org.apache.hadoop.hbase.io.HeapSize; 058import org.apache.hadoop.hbase.io.hfile.BlockCache; 059import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 060import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil; 061import org.apache.hadoop.hbase.io.hfile.BlockPriority; 062import org.apache.hadoop.hbase.io.hfile.BlockType; 063import org.apache.hadoop.hbase.io.hfile.CacheStats; 064import org.apache.hadoop.hbase.io.hfile.Cacheable; 065import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType; 066import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; 067import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; 068import org.apache.hadoop.hbase.io.hfile.CachedBlock; 069import org.apache.hadoop.hbase.io.hfile.HFileBlock; 070import org.apache.hadoop.hbase.nio.ByteBuff; 071import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 072import org.apache.hadoop.hbase.util.HasThread; 073import org.apache.hadoop.hbase.util.IdReadWriteLock; 074import org.apache.hadoop.hbase.util.IdReadWriteLock.ReferenceType; 075import org.apache.hadoop.hbase.util.UnsafeAvailChecker; 076import org.apache.hadoop.util.StringUtils; 077import org.apache.yetus.audience.InterfaceAudience; 078import org.slf4j.Logger; 079import org.slf4j.LoggerFactory; 080 081import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 082import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 083import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 084 085/** 086 * BucketCache uses {@link BucketAllocator} to allocate/free blocks, and uses 087 * BucketCache#ramCache and BucketCache#backingMap in order to 088 * determine if a given element is in the cache. The bucket cache can use on-heap or 089 * off-heap memory {@link ByteBufferIOEngine} or in a file {@link FileIOEngine} to 090 * store/read the block data. 091 * 092 * <p>Eviction is via a similar algorithm as used in 093 * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache} 094 * 095 * <p>BucketCache can be used as mainly a block cache (see 096 * {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with 097 * LruBlockCache to decrease CMS GC and heap fragmentation. 098 * 099 * <p>It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store 100 * blocks) to enlarge cache space via 101 * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache#setVictimCache} 102 */ 103@InterfaceAudience.Private 104public class BucketCache implements BlockCache, HeapSize { 105 private static final Logger LOG = LoggerFactory.getLogger(BucketCache.class); 106 107 /** Priority buckets config */ 108 static final String SINGLE_FACTOR_CONFIG_NAME = "hbase.bucketcache.single.factor"; 109 static final String MULTI_FACTOR_CONFIG_NAME = "hbase.bucketcache.multi.factor"; 110 static final String MEMORY_FACTOR_CONFIG_NAME = "hbase.bucketcache.memory.factor"; 111 static final String EXTRA_FREE_FACTOR_CONFIG_NAME = "hbase.bucketcache.extrafreefactor"; 112 static final String ACCEPT_FACTOR_CONFIG_NAME = "hbase.bucketcache.acceptfactor"; 113 static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor"; 114 115 /** Priority buckets */ 116 @VisibleForTesting 117 static final float DEFAULT_SINGLE_FACTOR = 0.25f; 118 static final float DEFAULT_MULTI_FACTOR = 0.50f; 119 static final float DEFAULT_MEMORY_FACTOR = 0.25f; 120 static final float DEFAULT_MIN_FACTOR = 0.85f; 121 122 private static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f; 123 private static final float DEFAULT_ACCEPT_FACTOR = 0.95f; 124 125 // Number of blocks to clear for each of the bucket size that is full 126 private static final int DEFAULT_FREE_ENTIRE_BLOCK_FACTOR = 2; 127 128 /** Statistics thread */ 129 private static final int statThreadPeriod = 5 * 60; 130 131 final static int DEFAULT_WRITER_THREADS = 3; 132 final static int DEFAULT_WRITER_QUEUE_ITEMS = 64; 133 134 // Store/read block data 135 final IOEngine ioEngine; 136 137 // Store the block in this map before writing it to cache 138 @VisibleForTesting 139 final ConcurrentMap<BlockCacheKey, RAMQueueEntry> ramCache; 140 // In this map, store the block's meta data like offset, length 141 @VisibleForTesting 142 ConcurrentMap<BlockCacheKey, BucketEntry> backingMap; 143 144 /** 145 * Flag if the cache is enabled or not... We shut it off if there are IO 146 * errors for some time, so that Bucket IO exceptions/errors don't bring down 147 * the HBase server. 148 */ 149 private volatile boolean cacheEnabled; 150 151 /** 152 * A list of writer queues. We have a queue per {@link WriterThread} we have running. 153 * In other words, the work adding blocks to the BucketCache is divided up amongst the 154 * running WriterThreads. Its done by taking hash of the cache key modulo queue count. 155 * WriterThread when it runs takes whatever has been recently added and 'drains' the entries 156 * to the BucketCache. It then updates the ramCache and backingMap accordingly. 157 */ 158 @VisibleForTesting 159 final ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues = new ArrayList<>(); 160 @VisibleForTesting 161 final WriterThread[] writerThreads; 162 163 /** Volatile boolean to track if free space is in process or not */ 164 private volatile boolean freeInProgress = false; 165 private final Lock freeSpaceLock = new ReentrantLock(); 166 167 private UniqueIndexMap<Integer> deserialiserMap = new UniqueIndexMap<>(); 168 169 private final LongAdder realCacheSize = new LongAdder(); 170 private final LongAdder heapSize = new LongAdder(); 171 /** Current number of cached elements */ 172 private final LongAdder blockNumber = new LongAdder(); 173 174 /** Cache access count (sequential ID) */ 175 private final AtomicLong accessCount = new AtomicLong(); 176 177 private static final int DEFAULT_CACHE_WAIT_TIME = 50; 178 // Used in test now. If the flag is false and the cache speed is very fast, 179 // bucket cache will skip some blocks when caching. If the flag is true, we 180 // will wait blocks flushed to IOEngine for some time when caching 181 boolean wait_when_cache = false; 182 183 private final BucketCacheStats cacheStats = new BucketCacheStats(); 184 185 private final String persistencePath; 186 private final long cacheCapacity; 187 /** Approximate block size */ 188 private final long blockSize; 189 190 /** Duration of IO errors tolerated before we disable cache, 1 min as default */ 191 private final int ioErrorsTolerationDuration; 192 // 1 min 193 public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000; 194 195 // Start time of first IO error when reading or writing IO Engine, it will be 196 // reset after a successful read/write. 197 private volatile long ioErrorStartTime = -1; 198 199 /** 200 * A ReentrantReadWriteLock to lock on a particular block identified by offset. 201 * The purpose of this is to avoid freeing the block which is being read. 202 * <p> 203 * Key set of offsets in BucketCache is limited so soft reference is the best choice here. 204 */ 205 @VisibleForTesting 206 final IdReadWriteLock<Long> offsetLock = new IdReadWriteLock<>(ReferenceType.SOFT); 207 208 private final NavigableSet<BlockCacheKey> blocksByHFile = 209 new ConcurrentSkipListSet<>(new Comparator<BlockCacheKey>() { 210 @Override 211 public int compare(BlockCacheKey a, BlockCacheKey b) { 212 int nameComparison = a.getHfileName().compareTo(b.getHfileName()); 213 if (nameComparison != 0) { 214 return nameComparison; 215 } 216 217 if (a.getOffset() == b.getOffset()) { 218 return 0; 219 } else if (a.getOffset() < b.getOffset()) { 220 return -1; 221 } 222 return 1; 223 } 224 }); 225 226 /** Statistics thread schedule pool (for heavy debugging, could remove) */ 227 private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1, 228 new ThreadFactoryBuilder().setNameFormat("BucketCacheStatsExecutor").setDaemon(true).build()); 229 230 // Allocate or free space for the block 231 private BucketAllocator bucketAllocator; 232 233 /** Acceptable size of cache (no evictions if size < acceptable) */ 234 private float acceptableFactor; 235 236 /** Minimum threshold of cache (when evicting, evict until size < min) */ 237 private float minFactor; 238 239 /** Free this floating point factor of extra blocks when evicting. For example free the number of blocks requested * (1 + extraFreeFactor) */ 240 private float extraFreeFactor; 241 242 /** Single access bucket size */ 243 private float singleFactor; 244 245 /** Multiple access bucket size */ 246 private float multiFactor; 247 248 /** In-memory bucket size */ 249 private float memoryFactor; 250 251 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 252 int writerThreadNum, int writerQLen, String persistencePath) throws FileNotFoundException, 253 IOException { 254 this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, 255 persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, HBaseConfiguration.create()); 256 } 257 258 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 259 int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration, 260 Configuration conf) 261 throws FileNotFoundException, IOException { 262 this.ioEngine = getIOEngineFromName(ioEngineName, capacity, persistencePath); 263 this.writerThreads = new WriterThread[writerThreadNum]; 264 long blockNumCapacity = capacity / blockSize; 265 if (blockNumCapacity >= Integer.MAX_VALUE) { 266 // Enough for about 32TB of cache! 267 throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now"); 268 } 269 270 this.acceptableFactor = conf.getFloat(ACCEPT_FACTOR_CONFIG_NAME, DEFAULT_ACCEPT_FACTOR); 271 this.minFactor = conf.getFloat(MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR); 272 this.extraFreeFactor = conf.getFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, DEFAULT_EXTRA_FREE_FACTOR); 273 this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR); 274 this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR); 275 this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR); 276 277 sanityCheckConfigs(); 278 279 LOG.info("Instantiating BucketCache with acceptableFactor: " + acceptableFactor + ", minFactor: " + minFactor + 280 ", extraFreeFactor: " + extraFreeFactor + ", singleFactor: " + singleFactor + ", multiFactor: " + multiFactor + 281 ", memoryFactor: " + memoryFactor); 282 283 this.cacheCapacity = capacity; 284 this.persistencePath = persistencePath; 285 this.blockSize = blockSize; 286 this.ioErrorsTolerationDuration = ioErrorsTolerationDuration; 287 288 bucketAllocator = new BucketAllocator(capacity, bucketSizes); 289 for (int i = 0; i < writerThreads.length; ++i) { 290 writerQueues.add(new ArrayBlockingQueue<>(writerQLen)); 291 } 292 293 assert writerQueues.size() == writerThreads.length; 294 this.ramCache = new ConcurrentHashMap<>(); 295 296 this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity); 297 298 if (ioEngine.isPersistent() && persistencePath != null) { 299 try { 300 retrieveFromFile(bucketSizes); 301 } catch (IOException ioex) { 302 LOG.error("Can't restore from file because of", ioex); 303 } catch (ClassNotFoundException cnfe) { 304 LOG.error("Can't restore from file in rebuild because can't deserialise",cnfe); 305 throw new RuntimeException(cnfe); 306 } 307 } 308 final String threadName = Thread.currentThread().getName(); 309 this.cacheEnabled = true; 310 for (int i = 0; i < writerThreads.length; ++i) { 311 writerThreads[i] = new WriterThread(writerQueues.get(i)); 312 writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i); 313 writerThreads[i].setDaemon(true); 314 } 315 startWriterThreads(); 316 317 // Run the statistics thread periodically to print the cache statistics log 318 // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log 319 // every five minutes. 320 this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), 321 statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS); 322 LOG.info("Started bucket cache; ioengine=" + ioEngineName + 323 ", capacity=" + StringUtils.byteDesc(capacity) + 324 ", blockSize=" + StringUtils.byteDesc(blockSize) + ", writerThreadNum=" + 325 writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath=" + 326 persistencePath + ", bucketAllocator=" + this.bucketAllocator.getClass().getName()); 327 } 328 329 private void sanityCheckConfigs() { 330 Preconditions.checkArgument(acceptableFactor <= 1 && acceptableFactor >= 0, ACCEPT_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 331 Preconditions.checkArgument(minFactor <= 1 && minFactor >= 0, MIN_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 332 Preconditions.checkArgument(minFactor <= acceptableFactor, MIN_FACTOR_CONFIG_NAME + " must be <= " + ACCEPT_FACTOR_CONFIG_NAME); 333 Preconditions.checkArgument(extraFreeFactor >= 0, EXTRA_FREE_FACTOR_CONFIG_NAME + " must be greater than 0.0"); 334 Preconditions.checkArgument(singleFactor <= 1 && singleFactor >= 0, SINGLE_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 335 Preconditions.checkArgument(multiFactor <= 1 && multiFactor >= 0, MULTI_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 336 Preconditions.checkArgument(memoryFactor <= 1 && memoryFactor >= 0, MEMORY_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 337 Preconditions.checkArgument((singleFactor + multiFactor + memoryFactor) == 1, SINGLE_FACTOR_CONFIG_NAME + ", " + 338 MULTI_FACTOR_CONFIG_NAME + ", and " + MEMORY_FACTOR_CONFIG_NAME + " segments must add up to 1.0"); 339 } 340 341 /** 342 * Called by the constructor to start the writer threads. Used by tests that need to override 343 * starting the threads. 344 */ 345 @VisibleForTesting 346 protected void startWriterThreads() { 347 for (WriterThread thread : writerThreads) { 348 thread.start(); 349 } 350 } 351 352 @VisibleForTesting 353 boolean isCacheEnabled() { 354 return this.cacheEnabled; 355 } 356 357 @Override 358 public long getMaxSize() { 359 return this.cacheCapacity; 360 } 361 362 public String getIoEngine() { 363 return ioEngine.toString(); 364 } 365 366 /** 367 * Get the IOEngine from the IO engine name 368 * @param ioEngineName 369 * @param capacity 370 * @param persistencePath 371 * @return the IOEngine 372 * @throws IOException 373 */ 374 private IOEngine getIOEngineFromName(String ioEngineName, long capacity, String persistencePath) 375 throws IOException { 376 if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) { 377 // In order to make the usage simple, we only need the prefix 'files:' in 378 // document whether one or multiple file(s), but also support 'file:' for 379 // the compatibility 380 String[] filePaths = ioEngineName.substring(ioEngineName.indexOf(":") + 1) 381 .split(FileIOEngine.FILE_DELIMITER); 382 return new FileIOEngine(capacity, persistencePath != null, filePaths); 383 } else if (ioEngineName.startsWith("offheap")) { 384 return new ByteBufferIOEngine(capacity); 385 } else if (ioEngineName.startsWith("mmap:")) { 386 return new FileMmapEngine(ioEngineName.substring(5), capacity); 387 } else { 388 throw new IllegalArgumentException( 389 "Don't understand io engine name for cache- prefix with file:, files:, mmap: or offheap"); 390 } 391 } 392 393 /** 394 * Cache the block with the specified name and buffer. 395 * @param cacheKey block's cache key 396 * @param buf block buffer 397 */ 398 @Override 399 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { 400 cacheBlock(cacheKey, buf, false); 401 } 402 403 /** 404 * Cache the block with the specified name and buffer. 405 * @param cacheKey block's cache key 406 * @param cachedItem block buffer 407 * @param inMemory if block is in-memory 408 */ 409 @Override 410 public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory) { 411 cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache); 412 } 413 414 /** 415 * Cache the block to ramCache 416 * @param cacheKey block's cache key 417 * @param cachedItem block buffer 418 * @param inMemory if block is in-memory 419 * @param wait if true, blocking wait when queue is full 420 */ 421 private void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, 422 boolean wait) { 423 if (cacheEnabled) { 424 if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) { 425 if (BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, cachedItem)) { 426 cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); 427 } 428 } else { 429 cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); 430 } 431 } 432 } 433 434 private void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem, 435 boolean inMemory, boolean wait) { 436 if (!cacheEnabled) { 437 return; 438 } 439 LOG.trace("Caching key={}, item={}", cacheKey, cachedItem); 440 // Stuff the entry into the RAM cache so it can get drained to the persistent store 441 RAMQueueEntry re = 442 new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory); 443 /** 444 * Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same 445 * key in ramCache, the heap size of bucket cache need to update if replacing entry from 446 * ramCache. But WriterThread will also remove entry from ramCache and update heap size, if 447 * using ramCache.put(), It's possible that the removed entry in WriterThread is not the correct 448 * one, then the heap size will mess up (HBASE-20789) 449 */ 450 if (ramCache.putIfAbsent(cacheKey, re) != null) { 451 return; 452 } 453 int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size(); 454 BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum); 455 boolean successfulAddition = false; 456 if (wait) { 457 try { 458 successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS); 459 } catch (InterruptedException e) { 460 Thread.currentThread().interrupt(); 461 } 462 } else { 463 successfulAddition = bq.offer(re); 464 } 465 if (!successfulAddition) { 466 ramCache.remove(cacheKey); 467 cacheStats.failInsert(); 468 } else { 469 this.blockNumber.increment(); 470 this.heapSize.add(cachedItem.heapSize()); 471 blocksByHFile.add(cacheKey); 472 } 473 } 474 475 /** 476 * Get the buffer of the block with the specified key. 477 * @param key block's cache key 478 * @param caching true if the caller caches blocks on cache misses 479 * @param repeat Whether this is a repeat lookup for the same block 480 * @param updateCacheMetrics Whether we should update cache metrics or not 481 * @return buffer of specified cache key, or null if not in cache 482 */ 483 @Override 484 public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, 485 boolean updateCacheMetrics) { 486 if (!cacheEnabled) { 487 return null; 488 } 489 RAMQueueEntry re = ramCache.get(key); 490 if (re != null) { 491 if (updateCacheMetrics) { 492 cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); 493 } 494 re.access(accessCount.incrementAndGet()); 495 return re.getData(); 496 } 497 BucketEntry bucketEntry = backingMap.get(key); 498 if (bucketEntry != null) { 499 long start = System.nanoTime(); 500 ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); 501 try { 502 lock.readLock().lock(); 503 // We can not read here even if backingMap does contain the given key because its offset 504 // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check 505 // existence here. 506 if (bucketEntry.equals(backingMap.get(key))) { 507 // TODO : change this area - should be removed after server cells and 508 // 12295 are available 509 int len = bucketEntry.getLength(); 510 if (LOG.isTraceEnabled()) { 511 LOG.trace("Read offset=" + bucketEntry.offset() + ", len=" + len); 512 } 513 Cacheable cachedBlock = ioEngine.read(bucketEntry.offset(), len, 514 bucketEntry.deserializerReference(this.deserialiserMap)); 515 long timeTaken = System.nanoTime() - start; 516 if (updateCacheMetrics) { 517 cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); 518 cacheStats.ioHit(timeTaken); 519 } 520 if (cachedBlock.getMemoryType() == MemoryType.SHARED) { 521 bucketEntry.incrementRefCountAndGet(); 522 } 523 bucketEntry.access(accessCount.incrementAndGet()); 524 if (this.ioErrorStartTime > 0) { 525 ioErrorStartTime = -1; 526 } 527 return cachedBlock; 528 } 529 } catch (IOException ioex) { 530 LOG.error("Failed reading block " + key + " from bucket cache", ioex); 531 checkIOErrorIsTolerated(); 532 } finally { 533 lock.readLock().unlock(); 534 } 535 } 536 if (!repeat && updateCacheMetrics) { 537 cacheStats.miss(caching, key.isPrimary(), key.getBlockType()); 538 } 539 return null; 540 } 541 542 @VisibleForTesting 543 void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber) { 544 bucketAllocator.freeBlock(bucketEntry.offset()); 545 realCacheSize.add(-1 * bucketEntry.getLength()); 546 blocksByHFile.remove(cacheKey); 547 if (decrementBlockNumber) { 548 this.blockNumber.decrement(); 549 } 550 } 551 552 @Override 553 public boolean evictBlock(BlockCacheKey cacheKey) { 554 return evictBlock(cacheKey, true); 555 } 556 557 // does not check for the ref count. Just tries to evict it if found in the 558 // bucket map 559 private boolean forceEvict(BlockCacheKey cacheKey) { 560 if (!cacheEnabled) { 561 return false; 562 } 563 RAMQueueEntry removedBlock = checkRamCache(cacheKey); 564 BucketEntry bucketEntry = backingMap.get(cacheKey); 565 if (bucketEntry == null) { 566 if (removedBlock != null) { 567 cacheStats.evicted(0, cacheKey.isPrimary()); 568 return true; 569 } else { 570 return false; 571 } 572 } 573 ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); 574 try { 575 lock.writeLock().lock(); 576 if (backingMap.remove(cacheKey, bucketEntry)) { 577 blockEvicted(cacheKey, bucketEntry, removedBlock == null); 578 } else { 579 return false; 580 } 581 } finally { 582 lock.writeLock().unlock(); 583 } 584 cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary()); 585 return true; 586 } 587 588 private RAMQueueEntry checkRamCache(BlockCacheKey cacheKey) { 589 RAMQueueEntry removedBlock = ramCache.remove(cacheKey); 590 if (removedBlock != null) { 591 this.blockNumber.decrement(); 592 this.heapSize.add(-1 * removedBlock.getData().heapSize()); 593 } 594 return removedBlock; 595 } 596 597 public boolean evictBlock(BlockCacheKey cacheKey, boolean deletedBlock) { 598 if (!cacheEnabled) { 599 return false; 600 } 601 RAMQueueEntry removedBlock = checkRamCache(cacheKey); 602 BucketEntry bucketEntry = backingMap.get(cacheKey); 603 if (bucketEntry == null) { 604 if (removedBlock != null) { 605 cacheStats.evicted(0, cacheKey.isPrimary()); 606 return true; 607 } else { 608 return false; 609 } 610 } 611 ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); 612 try { 613 lock.writeLock().lock(); 614 int refCount = bucketEntry.getRefCount(); 615 if (refCount == 0) { 616 if (backingMap.remove(cacheKey, bucketEntry)) { 617 blockEvicted(cacheKey, bucketEntry, removedBlock == null); 618 } else { 619 return false; 620 } 621 } else { 622 if(!deletedBlock) { 623 if (LOG.isDebugEnabled()) { 624 LOG.debug("This block " + cacheKey + " is still referred by " + refCount 625 + " readers. Can not be freed now"); 626 } 627 return false; 628 } else { 629 if (LOG.isDebugEnabled()) { 630 LOG.debug("This block " + cacheKey + " is still referred by " + refCount 631 + " readers. Can not be freed now. Hence will mark this" 632 + " for evicting at a later point"); 633 } 634 bucketEntry.markForEvict(); 635 } 636 } 637 } finally { 638 lock.writeLock().unlock(); 639 } 640 cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary()); 641 return true; 642 } 643 644 /* 645 * Statistics thread. Periodically output cache statistics to the log. 646 */ 647 private static class StatisticsThread extends Thread { 648 private final BucketCache bucketCache; 649 650 public StatisticsThread(BucketCache bucketCache) { 651 super("BucketCacheStatsThread"); 652 setDaemon(true); 653 this.bucketCache = bucketCache; 654 } 655 656 @Override 657 public void run() { 658 bucketCache.logStats(); 659 } 660 } 661 662 public void logStats() { 663 long totalSize = bucketAllocator.getTotalSize(); 664 long usedSize = bucketAllocator.getUsedSize(); 665 long freeSize = totalSize - usedSize; 666 long cacheSize = getRealCacheSize(); 667 LOG.info("failedBlockAdditions=" + cacheStats.getFailedInserts() + ", " + 668 "totalSize=" + StringUtils.byteDesc(totalSize) + ", " + 669 "freeSize=" + StringUtils.byteDesc(freeSize) + ", " + 670 "usedSize=" + StringUtils.byteDesc(usedSize) +", " + 671 "cacheSize=" + StringUtils.byteDesc(cacheSize) +", " + 672 "accesses=" + cacheStats.getRequestCount() + ", " + 673 "hits=" + cacheStats.getHitCount() + ", " + 674 "IOhitsPerSecond=" + cacheStats.getIOHitsPerSecond() + ", " + 675 "IOTimePerHit=" + String.format("%.2f", cacheStats.getIOTimePerHit())+ ", " + 676 "hitRatio=" + (cacheStats.getHitCount() == 0 ? "0," : 677 (StringUtils.formatPercent(cacheStats.getHitRatio(), 2)+ ", ")) + 678 "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " + 679 "cachingHits=" + cacheStats.getHitCachingCount() + ", " + 680 "cachingHitsRatio=" +(cacheStats.getHitCachingCount() == 0 ? "0," : 681 (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2)+ ", ")) + 682 "evictions=" + cacheStats.getEvictionCount() + ", " + 683 "evicted=" + cacheStats.getEvictedCount() + ", " + 684 "evictedPerRun=" + cacheStats.evictedPerEviction()); 685 cacheStats.reset(); 686 } 687 688 public long getRealCacheSize() { 689 return this.realCacheSize.sum(); 690 } 691 692 private long acceptableSize() { 693 return (long) Math.floor(bucketAllocator.getTotalSize() * acceptableFactor); 694 } 695 696 @VisibleForTesting 697 long getPartitionSize(float partitionFactor) { 698 return (long) Math.floor(bucketAllocator.getTotalSize() * partitionFactor * minFactor); 699 } 700 701 /** 702 * Return the count of bucketSizeinfos still need free space 703 */ 704 private int bucketSizesAboveThresholdCount(float minFactor) { 705 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); 706 int fullCount = 0; 707 for (int i = 0; i < stats.length; i++) { 708 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor)); 709 freeGoal = Math.max(freeGoal, 1); 710 if (stats[i].freeCount() < freeGoal) { 711 fullCount++; 712 } 713 } 714 return fullCount; 715 } 716 717 /** 718 * This method will find the buckets that are minimally occupied 719 * and are not reference counted and will free them completely 720 * without any constraint on the access times of the elements, 721 * and as a process will completely free at most the number of buckets 722 * passed, sometimes it might not due to changing refCounts 723 * 724 * @param completelyFreeBucketsNeeded number of buckets to free 725 **/ 726 private void freeEntireBuckets(int completelyFreeBucketsNeeded) { 727 if (completelyFreeBucketsNeeded != 0) { 728 // First we will build a set where the offsets are reference counted, usually 729 // this set is small around O(Handler Count) unless something else is wrong 730 Set<Integer> inUseBuckets = new HashSet<Integer>(); 731 for (BucketEntry entry : backingMap.values()) { 732 if (entry.getRefCount() != 0) { 733 inUseBuckets.add(bucketAllocator.getBucketIndex(entry.offset())); 734 } 735 } 736 737 Set<Integer> candidateBuckets = bucketAllocator.getLeastFilledBuckets( 738 inUseBuckets, completelyFreeBucketsNeeded); 739 for (Map.Entry<BlockCacheKey, BucketEntry> entry : backingMap.entrySet()) { 740 if (candidateBuckets.contains(bucketAllocator 741 .getBucketIndex(entry.getValue().offset()))) { 742 evictBlock(entry.getKey(), false); 743 } 744 } 745 } 746 } 747 748 /** 749 * Free the space if the used size reaches acceptableSize() or one size block 750 * couldn't be allocated. When freeing the space, we use the LRU algorithm and 751 * ensure there must be some blocks evicted 752 * @param why Why we are being called 753 */ 754 private void freeSpace(final String why) { 755 // Ensure only one freeSpace progress at a time 756 if (!freeSpaceLock.tryLock()) { 757 return; 758 } 759 try { 760 freeInProgress = true; 761 long bytesToFreeWithoutExtra = 0; 762 // Calculate free byte for each bucketSizeinfo 763 StringBuilder msgBuffer = LOG.isDebugEnabled()? new StringBuilder(): null; 764 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); 765 long[] bytesToFreeForBucket = new long[stats.length]; 766 for (int i = 0; i < stats.length; i++) { 767 bytesToFreeForBucket[i] = 0; 768 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor)); 769 freeGoal = Math.max(freeGoal, 1); 770 if (stats[i].freeCount() < freeGoal) { 771 bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount()); 772 bytesToFreeWithoutExtra += bytesToFreeForBucket[i]; 773 if (msgBuffer != null) { 774 msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")=" 775 + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", "); 776 } 777 } 778 } 779 if (msgBuffer != null) { 780 msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", "); 781 } 782 783 if (bytesToFreeWithoutExtra <= 0) { 784 return; 785 } 786 long currentSize = bucketAllocator.getUsedSize(); 787 long totalSize = bucketAllocator.getTotalSize(); 788 if (LOG.isDebugEnabled() && msgBuffer != null) { 789 LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString() + 790 " of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" + 791 StringUtils.byteDesc(realCacheSize.sum()) + ", total=" + StringUtils.byteDesc(totalSize)); 792 } 793 794 long bytesToFreeWithExtra = (long) Math.floor(bytesToFreeWithoutExtra 795 * (1 + extraFreeFactor)); 796 797 // Instantiate priority buckets 798 BucketEntryGroup bucketSingle = new BucketEntryGroup(bytesToFreeWithExtra, 799 blockSize, getPartitionSize(singleFactor)); 800 BucketEntryGroup bucketMulti = new BucketEntryGroup(bytesToFreeWithExtra, 801 blockSize, getPartitionSize(multiFactor)); 802 BucketEntryGroup bucketMemory = new BucketEntryGroup(bytesToFreeWithExtra, 803 blockSize, getPartitionSize(memoryFactor)); 804 805 // Scan entire map putting bucket entry into appropriate bucket entry 806 // group 807 for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) { 808 switch (bucketEntryWithKey.getValue().getPriority()) { 809 case SINGLE: { 810 bucketSingle.add(bucketEntryWithKey); 811 break; 812 } 813 case MULTI: { 814 bucketMulti.add(bucketEntryWithKey); 815 break; 816 } 817 case MEMORY: { 818 bucketMemory.add(bucketEntryWithKey); 819 break; 820 } 821 } 822 } 823 824 PriorityQueue<BucketEntryGroup> bucketQueue = new PriorityQueue<>(3, 825 Comparator.comparingLong(BucketEntryGroup::overflow)); 826 827 bucketQueue.add(bucketSingle); 828 bucketQueue.add(bucketMulti); 829 bucketQueue.add(bucketMemory); 830 831 int remainingBuckets = bucketQueue.size(); 832 long bytesFreed = 0; 833 834 BucketEntryGroup bucketGroup; 835 while ((bucketGroup = bucketQueue.poll()) != null) { 836 long overflow = bucketGroup.overflow(); 837 if (overflow > 0) { 838 long bucketBytesToFree = Math.min(overflow, 839 (bytesToFreeWithoutExtra - bytesFreed) / remainingBuckets); 840 bytesFreed += bucketGroup.free(bucketBytesToFree); 841 } 842 remainingBuckets--; 843 } 844 845 // Check and free if there are buckets that still need freeing of space 846 if (bucketSizesAboveThresholdCount(minFactor) > 0) { 847 bucketQueue.clear(); 848 remainingBuckets = 3; 849 850 bucketQueue.add(bucketSingle); 851 bucketQueue.add(bucketMulti); 852 bucketQueue.add(bucketMemory); 853 854 while ((bucketGroup = bucketQueue.poll()) != null) { 855 long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets; 856 bytesFreed += bucketGroup.free(bucketBytesToFree); 857 remainingBuckets--; 858 } 859 } 860 861 // Even after the above free we might still need freeing because of the 862 // De-fragmentation of the buckets (also called Slab Calcification problem), i.e 863 // there might be some buckets where the occupancy is very sparse and thus are not 864 // yielding the free for the other bucket sizes, the fix for this to evict some 865 // of the buckets, we do this by evicting the buckets that are least fulled 866 freeEntireBuckets(DEFAULT_FREE_ENTIRE_BLOCK_FACTOR * 867 bucketSizesAboveThresholdCount(1.0f)); 868 869 if (LOG.isDebugEnabled()) { 870 long single = bucketSingle.totalSize(); 871 long multi = bucketMulti.totalSize(); 872 long memory = bucketMemory.totalSize(); 873 if (LOG.isDebugEnabled()) { 874 LOG.debug("Bucket cache free space completed; " + "freed=" 875 + StringUtils.byteDesc(bytesFreed) + ", " + "total=" 876 + StringUtils.byteDesc(totalSize) + ", " + "single=" 877 + StringUtils.byteDesc(single) + ", " + "multi=" 878 + StringUtils.byteDesc(multi) + ", " + "memory=" 879 + StringUtils.byteDesc(memory)); 880 } 881 } 882 883 } catch (Throwable t) { 884 LOG.warn("Failed freeing space", t); 885 } finally { 886 cacheStats.evict(); 887 freeInProgress = false; 888 freeSpaceLock.unlock(); 889 } 890 } 891 892 // This handles flushing the RAM cache to IOEngine. 893 @VisibleForTesting 894 class WriterThread extends HasThread { 895 private final BlockingQueue<RAMQueueEntry> inputQueue; 896 private volatile boolean writerEnabled = true; 897 898 WriterThread(BlockingQueue<RAMQueueEntry> queue) { 899 super("BucketCacheWriterThread"); 900 this.inputQueue = queue; 901 } 902 903 // Used for test 904 @VisibleForTesting 905 void disableWriter() { 906 this.writerEnabled = false; 907 } 908 909 @Override 910 public void run() { 911 List<RAMQueueEntry> entries = new ArrayList<>(); 912 try { 913 while (cacheEnabled && writerEnabled) { 914 try { 915 try { 916 // Blocks 917 entries = getRAMQueueEntries(inputQueue, entries); 918 } catch (InterruptedException ie) { 919 if (!cacheEnabled) break; 920 } 921 doDrain(entries); 922 } catch (Exception ioe) { 923 LOG.error("WriterThread encountered error", ioe); 924 } 925 } 926 } catch (Throwable t) { 927 LOG.warn("Failed doing drain", t); 928 } 929 LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled); 930 } 931 932 /** 933 * Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing 934 * cache with a new block for the same cache key. there's a corner case: one thread cache a 935 * block in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another 936 * new block with the same cache key do the same thing for the same cache key, so if not evict 937 * the previous bucket entry, then memory leak happen because the previous bucketEntry is gone 938 * but the bucketAllocator do not free its memory. 939 * @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey 940 * cacheKey, Cacheable newBlock) 941 * @param key Block cache key 942 * @param bucketEntry Bucket entry to put into backingMap. 943 */ 944 private void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) { 945 BucketEntry previousEntry = backingMap.put(key, bucketEntry); 946 if (previousEntry != null && previousEntry != bucketEntry) { 947 ReentrantReadWriteLock lock = offsetLock.getLock(previousEntry.offset()); 948 lock.writeLock().lock(); 949 try { 950 blockEvicted(key, previousEntry, false); 951 } finally { 952 lock.writeLock().unlock(); 953 } 954 } 955 } 956 957 /** 958 * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap. 959 * Process all that are passed in even if failure being sure to remove from ramCache else we'll 960 * never undo the references and we'll OOME. 961 * @param entries Presumes list passed in here will be processed by this invocation only. No 962 * interference expected. 963 * @throws InterruptedException 964 */ 965 @VisibleForTesting 966 void doDrain(final List<RAMQueueEntry> entries) throws InterruptedException { 967 if (entries.isEmpty()) { 968 return; 969 } 970 // This method is a little hard to follow. We run through the passed in entries and for each 971 // successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must 972 // do cleanup making sure we've cleared ramCache of all entries regardless of whether we 973 // successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by 974 // filling ramCache. We do the clean up by again running through the passed in entries 975 // doing extra work when we find a non-null bucketEntries corresponding entry. 976 final int size = entries.size(); 977 BucketEntry[] bucketEntries = new BucketEntry[size]; 978 // Index updated inside loop if success or if we can't succeed. We retry if cache is full 979 // when we go to add an entry by going around the loop again without upping the index. 980 int index = 0; 981 while (cacheEnabled && index < size) { 982 RAMQueueEntry re = null; 983 try { 984 re = entries.get(index); 985 if (re == null) { 986 LOG.warn("Couldn't get entry or changed on us; who else is messing with it?"); 987 index++; 988 continue; 989 } 990 BucketEntry bucketEntry = 991 re.writeToCache(ioEngine, bucketAllocator, deserialiserMap, realCacheSize); 992 // Successfully added. Up index and add bucketEntry. Clear io exceptions. 993 bucketEntries[index] = bucketEntry; 994 if (ioErrorStartTime > 0) { 995 ioErrorStartTime = -1; 996 } 997 index++; 998 } catch (BucketAllocatorException fle) { 999 LOG.warn("Failed allocation for " + (re == null ? "" : re.getKey()) + "; " + fle); 1000 // Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below. 1001 bucketEntries[index] = null; 1002 index++; 1003 } catch (CacheFullException cfe) { 1004 // Cache full when we tried to add. Try freeing space and then retrying (don't up index) 1005 if (!freeInProgress) { 1006 freeSpace("Full!"); 1007 } else { 1008 Thread.sleep(50); 1009 } 1010 } catch (IOException ioex) { 1011 // Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem. 1012 LOG.error("Failed writing to bucket cache", ioex); 1013 checkIOErrorIsTolerated(); 1014 } 1015 } 1016 1017 // Make sure data pages are written on media before we update maps. 1018 try { 1019 ioEngine.sync(); 1020 } catch (IOException ioex) { 1021 LOG.error("Failed syncing IO engine", ioex); 1022 checkIOErrorIsTolerated(); 1023 // Since we failed sync, free the blocks in bucket allocator 1024 for (int i = 0; i < entries.size(); ++i) { 1025 if (bucketEntries[i] != null) { 1026 bucketAllocator.freeBlock(bucketEntries[i].offset()); 1027 bucketEntries[i] = null; 1028 } 1029 } 1030 } 1031 1032 // Now add to backingMap if successfully added to bucket cache. Remove from ramCache if 1033 // success or error. 1034 for (int i = 0; i < size; ++i) { 1035 BlockCacheKey key = entries.get(i).getKey(); 1036 // Only add if non-null entry. 1037 if (bucketEntries[i] != null) { 1038 putIntoBackingMap(key, bucketEntries[i]); 1039 } 1040 // Always remove from ramCache even if we failed adding it to the block cache above. 1041 RAMQueueEntry ramCacheEntry = ramCache.remove(key); 1042 if (ramCacheEntry != null) { 1043 heapSize.add(-1 * entries.get(i).getData().heapSize()); 1044 } else if (bucketEntries[i] != null){ 1045 // Block should have already been evicted. Remove it and free space. 1046 ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntries[i].offset()); 1047 try { 1048 lock.writeLock().lock(); 1049 if (backingMap.remove(key, bucketEntries[i])) { 1050 blockEvicted(key, bucketEntries[i], false); 1051 } 1052 } finally { 1053 lock.writeLock().unlock(); 1054 } 1055 } 1056 } 1057 1058 long used = bucketAllocator.getUsedSize(); 1059 if (used > acceptableSize()) { 1060 freeSpace("Used=" + used + " > acceptable=" + acceptableSize()); 1061 } 1062 return; 1063 } 1064 } 1065 1066 /** 1067 * Blocks until elements available in {@code q} then tries to grab as many as possible 1068 * before returning. 1069 * @param receptacle Where to stash the elements taken from queue. We clear before we use it 1070 * just in case. 1071 * @param q The queue to take from. 1072 * @return {@code receptacle} laden with elements taken from the queue or empty if none found. 1073 */ 1074 @VisibleForTesting 1075 static List<RAMQueueEntry> getRAMQueueEntries(final BlockingQueue<RAMQueueEntry> q, 1076 final List<RAMQueueEntry> receptacle) 1077 throws InterruptedException { 1078 // Clear sets all entries to null and sets size to 0. We retain allocations. Presume it 1079 // ok even if list grew to accommodate thousands. 1080 receptacle.clear(); 1081 receptacle.add(q.take()); 1082 q.drainTo(receptacle); 1083 return receptacle; 1084 } 1085 1086 private void persistToFile() throws IOException { 1087 assert !cacheEnabled; 1088 FileOutputStream fos = null; 1089 ObjectOutputStream oos = null; 1090 try { 1091 if (!ioEngine.isPersistent()) { 1092 throw new IOException("Attempt to persist non-persistent cache mappings!"); 1093 } 1094 fos = new FileOutputStream(persistencePath, false); 1095 oos = new ObjectOutputStream(fos); 1096 oos.writeLong(cacheCapacity); 1097 oos.writeUTF(ioEngine.getClass().getName()); 1098 oos.writeUTF(backingMap.getClass().getName()); 1099 oos.writeObject(deserialiserMap); 1100 oos.writeObject(backingMap); 1101 } finally { 1102 if (oos != null) oos.close(); 1103 if (fos != null) fos.close(); 1104 } 1105 } 1106 1107 @SuppressWarnings("unchecked") 1108 private void retrieveFromFile(int[] bucketSizes) throws IOException, BucketAllocatorException, 1109 ClassNotFoundException { 1110 File persistenceFile = new File(persistencePath); 1111 if (!persistenceFile.exists()) { 1112 return; 1113 } 1114 assert !cacheEnabled; 1115 FileInputStream fis = null; 1116 ObjectInputStream ois = null; 1117 try { 1118 if (!ioEngine.isPersistent()) 1119 throw new IOException( 1120 "Attempt to restore non-persistent cache mappings!"); 1121 fis = new FileInputStream(persistencePath); 1122 ois = new ObjectInputStream(fis); 1123 long capacitySize = ois.readLong(); 1124 if (capacitySize != cacheCapacity) 1125 throw new IOException("Mismatched cache capacity:" 1126 + StringUtils.byteDesc(capacitySize) + ", expected: " 1127 + StringUtils.byteDesc(cacheCapacity)); 1128 String ioclass = ois.readUTF(); 1129 String mapclass = ois.readUTF(); 1130 if (!ioEngine.getClass().getName().equals(ioclass)) 1131 throw new IOException("Class name for IO engine mismatch: " + ioclass 1132 + ", expected:" + ioEngine.getClass().getName()); 1133 if (!backingMap.getClass().getName().equals(mapclass)) 1134 throw new IOException("Class name for cache map mismatch: " + mapclass 1135 + ", expected:" + backingMap.getClass().getName()); 1136 UniqueIndexMap<Integer> deserMap = (UniqueIndexMap<Integer>) ois 1137 .readObject(); 1138 ConcurrentHashMap<BlockCacheKey, BucketEntry> backingMapFromFile = 1139 (ConcurrentHashMap<BlockCacheKey, BucketEntry>) ois.readObject(); 1140 BucketAllocator allocator = new BucketAllocator(cacheCapacity, bucketSizes, 1141 backingMapFromFile, realCacheSize); 1142 bucketAllocator = allocator; 1143 deserialiserMap = deserMap; 1144 backingMap = backingMapFromFile; 1145 } finally { 1146 if (ois != null) ois.close(); 1147 if (fis != null) fis.close(); 1148 if (!persistenceFile.delete()) { 1149 throw new IOException("Failed deleting persistence file " 1150 + persistenceFile.getAbsolutePath()); 1151 } 1152 } 1153 } 1154 1155 /** 1156 * Check whether we tolerate IO error this time. If the duration of IOEngine 1157 * throwing errors exceeds ioErrorsDurationTimeTolerated, we will disable the 1158 * cache 1159 */ 1160 private void checkIOErrorIsTolerated() { 1161 long now = EnvironmentEdgeManager.currentTime(); 1162 if (this.ioErrorStartTime > 0) { 1163 if (cacheEnabled && (now - ioErrorStartTime) > this.ioErrorsTolerationDuration) { 1164 LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration + 1165 "ms, disabling cache, please check your IOEngine"); 1166 disableCache(); 1167 } 1168 } else { 1169 this.ioErrorStartTime = now; 1170 } 1171 } 1172 1173 /** 1174 * Used to shut down the cache -or- turn it off in the case of something broken. 1175 */ 1176 private void disableCache() { 1177 if (!cacheEnabled) return; 1178 cacheEnabled = false; 1179 ioEngine.shutdown(); 1180 this.scheduleThreadPool.shutdown(); 1181 for (int i = 0; i < writerThreads.length; ++i) writerThreads[i].interrupt(); 1182 this.ramCache.clear(); 1183 if (!ioEngine.isPersistent() || persistencePath == null) { 1184 // If persistent ioengine and a path, we will serialize out the backingMap. 1185 this.backingMap.clear(); 1186 } 1187 } 1188 1189 private void join() throws InterruptedException { 1190 for (int i = 0; i < writerThreads.length; ++i) 1191 writerThreads[i].join(); 1192 } 1193 1194 @Override 1195 public void shutdown() { 1196 disableCache(); 1197 LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent() 1198 + "; path to write=" + persistencePath); 1199 if (ioEngine.isPersistent() && persistencePath != null) { 1200 try { 1201 join(); 1202 persistToFile(); 1203 } catch (IOException ex) { 1204 LOG.error("Unable to persist data on exit: " + ex.toString(), ex); 1205 } catch (InterruptedException e) { 1206 LOG.warn("Failed to persist data on exit", e); 1207 } 1208 } 1209 } 1210 1211 @Override 1212 public CacheStats getStats() { 1213 return cacheStats; 1214 } 1215 1216 public BucketAllocator getAllocator() { 1217 return this.bucketAllocator; 1218 } 1219 1220 @Override 1221 public long heapSize() { 1222 return this.heapSize.sum(); 1223 } 1224 1225 @Override 1226 public long size() { 1227 return this.realCacheSize.sum(); 1228 } 1229 1230 @Override 1231 public long getCurrentDataSize() { 1232 return size(); 1233 } 1234 1235 @Override 1236 public long getFreeSize() { 1237 return this.bucketAllocator.getFreeSize(); 1238 } 1239 1240 @Override 1241 public long getBlockCount() { 1242 return this.blockNumber.sum(); 1243 } 1244 1245 @Override 1246 public long getDataBlockCount() { 1247 return getBlockCount(); 1248 } 1249 1250 @Override 1251 public long getCurrentSize() { 1252 return this.bucketAllocator.getUsedSize(); 1253 } 1254 1255 /** 1256 * Evicts all blocks for a specific HFile. 1257 * <p> 1258 * This is used for evict-on-close to remove all blocks of a specific HFile. 1259 * 1260 * @return the number of blocks evicted 1261 */ 1262 @Override 1263 public int evictBlocksByHfileName(String hfileName) { 1264 Set<BlockCacheKey> keySet = blocksByHFile.subSet( 1265 new BlockCacheKey(hfileName, Long.MIN_VALUE), true, 1266 new BlockCacheKey(hfileName, Long.MAX_VALUE), true); 1267 1268 int numEvicted = 0; 1269 for (BlockCacheKey key : keySet) { 1270 if (evictBlock(key)) { 1271 ++numEvicted; 1272 } 1273 } 1274 1275 return numEvicted; 1276 } 1277 1278 /** 1279 * Item in cache. We expect this to be where most memory goes. Java uses 8 1280 * bytes just for object headers; after this, we want to use as little as 1281 * possible - so we only use 8 bytes, but in order to do so we end up messing 1282 * around with all this Java casting stuff. Offset stored as 5 bytes that make 1283 * up the long. Doubt we'll see devices this big for ages. Offsets are divided 1284 * by 256. So 5 bytes gives us 256TB or so. 1285 */ 1286 static class BucketEntry implements Serializable { 1287 private static final long serialVersionUID = -6741504807982257534L; 1288 1289 // access counter comparator, descending order 1290 static final Comparator<BucketEntry> COMPARATOR = new Comparator<BucketCache.BucketEntry>() { 1291 1292 @Override 1293 public int compare(BucketEntry o1, BucketEntry o2) { 1294 return Long.compare(o2.accessCounter, o1.accessCounter); 1295 } 1296 }; 1297 1298 private int offsetBase; 1299 private int length; 1300 private byte offset1; 1301 byte deserialiserIndex; 1302 private volatile long accessCounter; 1303 private BlockPriority priority; 1304 1305 /** 1306 * Time this block was cached. Presumes we are created just before we are added to the cache. 1307 */ 1308 private final long cachedTime = System.nanoTime(); 1309 1310 BucketEntry(long offset, int length, long accessCounter, boolean inMemory) { 1311 setOffset(offset); 1312 this.length = length; 1313 this.accessCounter = accessCounter; 1314 if (inMemory) { 1315 this.priority = BlockPriority.MEMORY; 1316 } else { 1317 this.priority = BlockPriority.SINGLE; 1318 } 1319 } 1320 1321 long offset() { // Java has no unsigned numbers 1322 long o = ((long) offsetBase) & 0xFFFFFFFFL; //This needs the L cast otherwise it will be sign extended as a negative number. 1323 o += (((long) (offset1)) & 0xFF) << 32; //The 0xFF here does not need the L cast because it is treated as a positive int. 1324 return o << 8; 1325 } 1326 1327 private void setOffset(long value) { 1328 assert (value & 0xFF) == 0; 1329 value >>= 8; 1330 offsetBase = (int) value; 1331 offset1 = (byte) (value >> 32); 1332 } 1333 1334 public int getLength() { 1335 return length; 1336 } 1337 1338 protected CacheableDeserializer<Cacheable> deserializerReference( 1339 UniqueIndexMap<Integer> deserialiserMap) { 1340 return CacheableDeserializerIdManager.getDeserializer(deserialiserMap 1341 .unmap(deserialiserIndex)); 1342 } 1343 1344 protected void setDeserialiserReference( 1345 CacheableDeserializer<Cacheable> deserializer, 1346 UniqueIndexMap<Integer> deserialiserMap) { 1347 this.deserialiserIndex = ((byte) deserialiserMap.map(deserializer 1348 .getDeserialiserIdentifier())); 1349 } 1350 1351 /** 1352 * Block has been accessed. Update its local access counter. 1353 */ 1354 public void access(long accessCounter) { 1355 this.accessCounter = accessCounter; 1356 if (this.priority == BlockPriority.SINGLE) { 1357 this.priority = BlockPriority.MULTI; 1358 } 1359 } 1360 1361 public BlockPriority getPriority() { 1362 return this.priority; 1363 } 1364 1365 public long getCachedTime() { 1366 return cachedTime; 1367 } 1368 1369 protected int getRefCount() { 1370 return 0; 1371 } 1372 1373 protected int incrementRefCountAndGet() { 1374 return 0; 1375 } 1376 1377 protected int decrementRefCountAndGet() { 1378 return 0; 1379 } 1380 1381 protected boolean isMarkedForEvict() { 1382 return false; 1383 } 1384 1385 protected void markForEvict() { 1386 // noop; 1387 } 1388 } 1389 1390 static class SharedMemoryBucketEntry extends BucketEntry { 1391 private static final long serialVersionUID = -2187147283772338481L; 1392 1393 // Set this when we were not able to forcefully evict the block 1394 private volatile boolean markedForEvict; 1395 private AtomicInteger refCount = new AtomicInteger(0); 1396 1397 SharedMemoryBucketEntry(long offset, int length, long accessCounter, boolean inMemory) { 1398 super(offset, length, accessCounter, inMemory); 1399 } 1400 1401 @Override 1402 protected int getRefCount() { 1403 return this.refCount.get(); 1404 } 1405 1406 @Override 1407 protected int incrementRefCountAndGet() { 1408 return this.refCount.incrementAndGet(); 1409 } 1410 1411 @Override 1412 protected int decrementRefCountAndGet() { 1413 return this.refCount.decrementAndGet(); 1414 } 1415 1416 @Override 1417 protected boolean isMarkedForEvict() { 1418 return this.markedForEvict; 1419 } 1420 1421 @Override 1422 protected void markForEvict() { 1423 this.markedForEvict = true; 1424 } 1425 } 1426 1427 /** 1428 * Used to group bucket entries into priority buckets. There will be a 1429 * BucketEntryGroup for each priority (single, multi, memory). Once bucketed, 1430 * the eviction algorithm takes the appropriate number of elements out of each 1431 * according to configuration parameters and their relative sizes. 1432 */ 1433 private class BucketEntryGroup { 1434 1435 private CachedEntryQueue queue; 1436 private long totalSize = 0; 1437 private long bucketSize; 1438 1439 public BucketEntryGroup(long bytesToFree, long blockSize, long bucketSize) { 1440 this.bucketSize = bucketSize; 1441 queue = new CachedEntryQueue(bytesToFree, blockSize); 1442 totalSize = 0; 1443 } 1444 1445 public void add(Map.Entry<BlockCacheKey, BucketEntry> block) { 1446 totalSize += block.getValue().getLength(); 1447 queue.add(block); 1448 } 1449 1450 public long free(long toFree) { 1451 Map.Entry<BlockCacheKey, BucketEntry> entry; 1452 long freedBytes = 0; 1453 // TODO avoid a cycling siutation. We find no block which is not in use and so no way to free 1454 // What to do then? Caching attempt fail? Need some changes in cacheBlock API? 1455 while ((entry = queue.pollLast()) != null) { 1456 if (evictBlock(entry.getKey(), false)) { 1457 freedBytes += entry.getValue().getLength(); 1458 } 1459 if (freedBytes >= toFree) { 1460 return freedBytes; 1461 } 1462 } 1463 return freedBytes; 1464 } 1465 1466 public long overflow() { 1467 return totalSize - bucketSize; 1468 } 1469 1470 public long totalSize() { 1471 return totalSize; 1472 } 1473 } 1474 1475 /** 1476 * Block Entry stored in the memory with key,data and so on 1477 */ 1478 @VisibleForTesting 1479 static class RAMQueueEntry { 1480 private BlockCacheKey key; 1481 private Cacheable data; 1482 private long accessCounter; 1483 private boolean inMemory; 1484 1485 public RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, 1486 boolean inMemory) { 1487 this.key = bck; 1488 this.data = data; 1489 this.accessCounter = accessCounter; 1490 this.inMemory = inMemory; 1491 } 1492 1493 public Cacheable getData() { 1494 return data; 1495 } 1496 1497 public BlockCacheKey getKey() { 1498 return key; 1499 } 1500 1501 public void access(long accessCounter) { 1502 this.accessCounter = accessCounter; 1503 } 1504 1505 private BucketEntry getBucketEntry(IOEngine ioEngine, long offset, int len) { 1506 if (ioEngine.usesSharedMemory()) { 1507 if (UnsafeAvailChecker.isAvailable()) { 1508 return new UnsafeSharedMemoryBucketEntry(offset, len, accessCounter, inMemory); 1509 } else { 1510 return new SharedMemoryBucketEntry(offset, len, accessCounter, inMemory); 1511 } 1512 } else { 1513 return new BucketEntry(offset, len, accessCounter, inMemory); 1514 } 1515 } 1516 1517 public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator bucketAllocator, 1518 final UniqueIndexMap<Integer> deserialiserMap, final LongAdder realCacheSize) 1519 throws IOException { 1520 int len = data.getSerializedLength(); 1521 // This cacheable thing can't be serialized 1522 if (len == 0) { 1523 return null; 1524 } 1525 long offset = bucketAllocator.allocateBlock(len); 1526 boolean succ = false; 1527 BucketEntry bucketEntry; 1528 try { 1529 bucketEntry = getBucketEntry(ioEngine, offset, len); 1530 bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap); 1531 if (data instanceof HFileBlock) { 1532 // If an instance of HFileBlock, save on some allocations. 1533 HFileBlock block = (HFileBlock) data; 1534 ByteBuff sliceBuf = block.getBufferReadOnly(); 1535 ByteBuffer metadata = block.getMetaData(); 1536 ioEngine.write(sliceBuf, offset); 1537 ioEngine.write(metadata, offset + len - metadata.limit()); 1538 } else { 1539 ByteBuffer bb = ByteBuffer.allocate(len); 1540 data.serialize(bb, true); 1541 ioEngine.write(bb, offset); 1542 } 1543 succ = true; 1544 } finally { 1545 if (!succ) { 1546 bucketAllocator.freeBlock(offset); 1547 } 1548 } 1549 realCacheSize.add(len); 1550 return bucketEntry; 1551 } 1552 } 1553 1554 /** 1555 * Only used in test 1556 * @throws InterruptedException 1557 */ 1558 void stopWriterThreads() throws InterruptedException { 1559 for (WriterThread writerThread : writerThreads) { 1560 writerThread.disableWriter(); 1561 writerThread.interrupt(); 1562 writerThread.join(); 1563 } 1564 } 1565 1566 @Override 1567 public Iterator<CachedBlock> iterator() { 1568 // Don't bother with ramcache since stuff is in here only a little while. 1569 final Iterator<Map.Entry<BlockCacheKey, BucketEntry>> i = 1570 this.backingMap.entrySet().iterator(); 1571 return new Iterator<CachedBlock>() { 1572 private final long now = System.nanoTime(); 1573 1574 @Override 1575 public boolean hasNext() { 1576 return i.hasNext(); 1577 } 1578 1579 @Override 1580 public CachedBlock next() { 1581 final Map.Entry<BlockCacheKey, BucketEntry> e = i.next(); 1582 return new CachedBlock() { 1583 @Override 1584 public String toString() { 1585 return BlockCacheUtil.toString(this, now); 1586 } 1587 1588 @Override 1589 public BlockPriority getBlockPriority() { 1590 return e.getValue().getPriority(); 1591 } 1592 1593 @Override 1594 public BlockType getBlockType() { 1595 // Not held by BucketEntry. Could add it if wanted on BucketEntry creation. 1596 return null; 1597 } 1598 1599 @Override 1600 public long getOffset() { 1601 return e.getKey().getOffset(); 1602 } 1603 1604 @Override 1605 public long getSize() { 1606 return e.getValue().getLength(); 1607 } 1608 1609 @Override 1610 public long getCachedTime() { 1611 return e.getValue().getCachedTime(); 1612 } 1613 1614 @Override 1615 public String getFilename() { 1616 return e.getKey().getHfileName(); 1617 } 1618 1619 @Override 1620 public int compareTo(CachedBlock other) { 1621 int diff = this.getFilename().compareTo(other.getFilename()); 1622 if (diff != 0) return diff; 1623 1624 diff = Long.compare(this.getOffset(), other.getOffset()); 1625 if (diff != 0) return diff; 1626 if (other.getCachedTime() < 0 || this.getCachedTime() < 0) { 1627 throw new IllegalStateException("" + this.getCachedTime() + ", " + 1628 other.getCachedTime()); 1629 } 1630 return Long.compare(other.getCachedTime(), this.getCachedTime()); 1631 } 1632 1633 @Override 1634 public int hashCode() { 1635 return e.getKey().hashCode(); 1636 } 1637 1638 @Override 1639 public boolean equals(Object obj) { 1640 if (obj instanceof CachedBlock) { 1641 CachedBlock cb = (CachedBlock)obj; 1642 return compareTo(cb) == 0; 1643 } else { 1644 return false; 1645 } 1646 } 1647 }; 1648 } 1649 1650 @Override 1651 public void remove() { 1652 throw new UnsupportedOperationException(); 1653 } 1654 }; 1655 } 1656 1657 @Override 1658 public BlockCache[] getBlockCaches() { 1659 return null; 1660 } 1661 1662 @Override 1663 public void returnBlock(BlockCacheKey cacheKey, Cacheable block) { 1664 if (block.getMemoryType() == MemoryType.SHARED) { 1665 BucketEntry bucketEntry = backingMap.get(cacheKey); 1666 if (bucketEntry != null) { 1667 int refCount = bucketEntry.decrementRefCountAndGet(); 1668 if (refCount == 0 && bucketEntry.isMarkedForEvict()) { 1669 forceEvict(cacheKey); 1670 } 1671 } 1672 } 1673 } 1674 1675 @VisibleForTesting 1676 public int getRefCount(BlockCacheKey cacheKey) { 1677 BucketEntry bucketEntry = backingMap.get(cacheKey); 1678 if (bucketEntry != null) { 1679 return bucketEntry.getRefCount(); 1680 } 1681 return 0; 1682 } 1683 1684 float getAcceptableFactor() { 1685 return acceptableFactor; 1686 } 1687 1688 float getMinFactor() { 1689 return minFactor; 1690 } 1691 1692 float getExtraFreeFactor() { 1693 return extraFreeFactor; 1694 } 1695 1696 float getSingleFactor() { 1697 return singleFactor; 1698 } 1699 1700 float getMultiFactor() { 1701 return multiFactor; 1702 } 1703 1704 float getMemoryFactor() { 1705 return memoryFactor; 1706 } 1707}