001/** 002 * Copyright The Apache Software Foundation 003 * 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021package org.apache.hadoop.hbase.io.hfile.bucket; 022 023import java.io.File; 024import java.io.FileInputStream; 025import java.io.FileNotFoundException; 026import java.io.FileOutputStream; 027import java.io.IOException; 028import java.io.ObjectInputStream; 029import java.io.ObjectOutputStream; 030import java.io.Serializable; 031import java.nio.ByteBuffer; 032import java.util.ArrayList; 033import java.util.Comparator; 034import java.util.HashSet; 035import java.util.Iterator; 036import java.util.List; 037import java.util.Map; 038import java.util.NavigableSet; 039import java.util.PriorityQueue; 040import java.util.Set; 041import java.util.concurrent.ArrayBlockingQueue; 042import java.util.concurrent.BlockingQueue; 043import java.util.concurrent.ConcurrentHashMap; 044import java.util.concurrent.ConcurrentMap; 045import java.util.concurrent.ConcurrentSkipListSet; 046import java.util.concurrent.Executors; 047import java.util.concurrent.ScheduledExecutorService; 048import java.util.concurrent.TimeUnit; 049import java.util.concurrent.atomic.AtomicInteger; 050import java.util.concurrent.atomic.AtomicLong; 051import java.util.concurrent.atomic.LongAdder; 052import java.util.concurrent.locks.Lock; 053import java.util.concurrent.locks.ReentrantLock; 054import java.util.concurrent.locks.ReentrantReadWriteLock; 055import org.apache.hadoop.conf.Configuration; 056import org.apache.hadoop.hbase.HBaseConfiguration; 057import org.apache.hadoop.hbase.io.HeapSize; 058import org.apache.hadoop.hbase.io.hfile.BlockCache; 059import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 060import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil; 061import org.apache.hadoop.hbase.io.hfile.BlockPriority; 062import org.apache.hadoop.hbase.io.hfile.BlockType; 063import org.apache.hadoop.hbase.io.hfile.CacheStats; 064import org.apache.hadoop.hbase.io.hfile.Cacheable; 065import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType; 066import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; 067import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; 068import org.apache.hadoop.hbase.io.hfile.CachedBlock; 069import org.apache.hadoop.hbase.io.hfile.HFileBlock; 070import org.apache.hadoop.hbase.nio.ByteBuff; 071import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 072import org.apache.hadoop.hbase.util.HasThread; 073import org.apache.hadoop.hbase.util.IdReadWriteLock; 074import org.apache.hadoop.hbase.util.IdReadWriteLock.ReferenceType; 075import org.apache.hadoop.hbase.util.UnsafeAvailChecker; 076import org.apache.hadoop.util.StringUtils; 077import org.apache.yetus.audience.InterfaceAudience; 078import org.slf4j.Logger; 079import org.slf4j.LoggerFactory; 080 081import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 082import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 083import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 084 085/** 086 * BucketCache uses {@link BucketAllocator} to allocate/free blocks, and uses 087 * BucketCache#ramCache and BucketCache#backingMap in order to 088 * determine if a given element is in the cache. The bucket cache can use on-heap or 089 * off-heap memory {@link ByteBufferIOEngine} or in a file {@link FileIOEngine} to 090 * store/read the block data. 091 * 092 * <p>Eviction is via a similar algorithm as used in 093 * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache} 094 * 095 * <p>BucketCache can be used as mainly a block cache (see 096 * {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with 097 * LruBlockCache to decrease CMS GC and heap fragmentation. 098 * 099 * <p>It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store 100 * blocks) to enlarge cache space via 101 * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache#setVictimCache} 102 */ 103@InterfaceAudience.Private 104public class BucketCache implements BlockCache, HeapSize { 105 private static final Logger LOG = LoggerFactory.getLogger(BucketCache.class); 106 107 /** Priority buckets config */ 108 static final String SINGLE_FACTOR_CONFIG_NAME = "hbase.bucketcache.single.factor"; 109 static final String MULTI_FACTOR_CONFIG_NAME = "hbase.bucketcache.multi.factor"; 110 static final String MEMORY_FACTOR_CONFIG_NAME = "hbase.bucketcache.memory.factor"; 111 static final String EXTRA_FREE_FACTOR_CONFIG_NAME = "hbase.bucketcache.extrafreefactor"; 112 static final String ACCEPT_FACTOR_CONFIG_NAME = "hbase.bucketcache.acceptfactor"; 113 static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor"; 114 115 /** Priority buckets */ 116 @VisibleForTesting 117 static final float DEFAULT_SINGLE_FACTOR = 0.25f; 118 static final float DEFAULT_MULTI_FACTOR = 0.50f; 119 static final float DEFAULT_MEMORY_FACTOR = 0.25f; 120 static final float DEFAULT_MIN_FACTOR = 0.85f; 121 122 private static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f; 123 private static final float DEFAULT_ACCEPT_FACTOR = 0.95f; 124 125 // Number of blocks to clear for each of the bucket size that is full 126 private static final int DEFAULT_FREE_ENTIRE_BLOCK_FACTOR = 2; 127 128 /** Statistics thread */ 129 private static final int statThreadPeriod = 5 * 60; 130 131 final static int DEFAULT_WRITER_THREADS = 3; 132 final static int DEFAULT_WRITER_QUEUE_ITEMS = 64; 133 134 // Store/read block data 135 final IOEngine ioEngine; 136 137 // Store the block in this map before writing it to cache 138 @VisibleForTesting 139 final ConcurrentMap<BlockCacheKey, RAMQueueEntry> ramCache; 140 // In this map, store the block's meta data like offset, length 141 @VisibleForTesting 142 ConcurrentMap<BlockCacheKey, BucketEntry> backingMap; 143 144 /** 145 * Flag if the cache is enabled or not... We shut it off if there are IO 146 * errors for some time, so that Bucket IO exceptions/errors don't bring down 147 * the HBase server. 148 */ 149 private volatile boolean cacheEnabled; 150 151 /** 152 * A list of writer queues. We have a queue per {@link WriterThread} we have running. 153 * In other words, the work adding blocks to the BucketCache is divided up amongst the 154 * running WriterThreads. Its done by taking hash of the cache key modulo queue count. 155 * WriterThread when it runs takes whatever has been recently added and 'drains' the entries 156 * to the BucketCache. It then updates the ramCache and backingMap accordingly. 157 */ 158 @VisibleForTesting 159 final ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues = new ArrayList<>(); 160 @VisibleForTesting 161 final WriterThread[] writerThreads; 162 163 /** Volatile boolean to track if free space is in process or not */ 164 private volatile boolean freeInProgress = false; 165 private final Lock freeSpaceLock = new ReentrantLock(); 166 167 private UniqueIndexMap<Integer> deserialiserMap = new UniqueIndexMap<>(); 168 169 private final LongAdder realCacheSize = new LongAdder(); 170 private final LongAdder heapSize = new LongAdder(); 171 /** Current number of cached elements */ 172 private final LongAdder blockNumber = new LongAdder(); 173 174 /** Cache access count (sequential ID) */ 175 private final AtomicLong accessCount = new AtomicLong(); 176 177 private static final int DEFAULT_CACHE_WAIT_TIME = 50; 178 // Used in test now. If the flag is false and the cache speed is very fast, 179 // bucket cache will skip some blocks when caching. If the flag is true, we 180 // will wait blocks flushed to IOEngine for some time when caching 181 boolean wait_when_cache = false; 182 183 private final BucketCacheStats cacheStats = new BucketCacheStats(); 184 185 private final String persistencePath; 186 private final long cacheCapacity; 187 /** Approximate block size */ 188 private final long blockSize; 189 190 /** Duration of IO errors tolerated before we disable cache, 1 min as default */ 191 private final int ioErrorsTolerationDuration; 192 // 1 min 193 public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000; 194 195 // Start time of first IO error when reading or writing IO Engine, it will be 196 // reset after a successful read/write. 197 private volatile long ioErrorStartTime = -1; 198 199 /** 200 * A ReentrantReadWriteLock to lock on a particular block identified by offset. 201 * The purpose of this is to avoid freeing the block which is being read. 202 * <p> 203 * Key set of offsets in BucketCache is limited so soft reference is the best choice here. 204 */ 205 @VisibleForTesting 206 final IdReadWriteLock<Long> offsetLock = new IdReadWriteLock<>(ReferenceType.SOFT); 207 208 private final NavigableSet<BlockCacheKey> blocksByHFile = 209 new ConcurrentSkipListSet<>(new Comparator<BlockCacheKey>() { 210 @Override 211 public int compare(BlockCacheKey a, BlockCacheKey b) { 212 int nameComparison = a.getHfileName().compareTo(b.getHfileName()); 213 if (nameComparison != 0) { 214 return nameComparison; 215 } 216 217 if (a.getOffset() == b.getOffset()) { 218 return 0; 219 } else if (a.getOffset() < b.getOffset()) { 220 return -1; 221 } 222 return 1; 223 } 224 }); 225 226 /** Statistics thread schedule pool (for heavy debugging, could remove) */ 227 private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1, 228 new ThreadFactoryBuilder().setNameFormat("BucketCacheStatsExecutor").setDaemon(true).build()); 229 230 // Allocate or free space for the block 231 private BucketAllocator bucketAllocator; 232 233 /** Acceptable size of cache (no evictions if size < acceptable) */ 234 private float acceptableFactor; 235 236 /** Minimum threshold of cache (when evicting, evict until size < min) */ 237 private float minFactor; 238 239 /** Free this floating point factor of extra blocks when evicting. For example free the number of blocks requested * (1 + extraFreeFactor) */ 240 private float extraFreeFactor; 241 242 /** Single access bucket size */ 243 private float singleFactor; 244 245 /** Multiple access bucket size */ 246 private float multiFactor; 247 248 /** In-memory bucket size */ 249 private float memoryFactor; 250 251 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 252 int writerThreadNum, int writerQLen, String persistencePath) throws FileNotFoundException, 253 IOException { 254 this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, 255 persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, HBaseConfiguration.create()); 256 } 257 258 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 259 int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration, 260 Configuration conf) 261 throws FileNotFoundException, IOException { 262 this.ioEngine = getIOEngineFromName(ioEngineName, capacity, persistencePath); 263 this.writerThreads = new WriterThread[writerThreadNum]; 264 long blockNumCapacity = capacity / blockSize; 265 if (blockNumCapacity >= Integer.MAX_VALUE) { 266 // Enough for about 32TB of cache! 267 throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now"); 268 } 269 270 this.acceptableFactor = conf.getFloat(ACCEPT_FACTOR_CONFIG_NAME, DEFAULT_ACCEPT_FACTOR); 271 this.minFactor = conf.getFloat(MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR); 272 this.extraFreeFactor = conf.getFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, DEFAULT_EXTRA_FREE_FACTOR); 273 this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR); 274 this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR); 275 this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR); 276 277 sanityCheckConfigs(); 278 279 LOG.info("Instantiating BucketCache with acceptableFactor: " + acceptableFactor + ", minFactor: " + minFactor + 280 ", extraFreeFactor: " + extraFreeFactor + ", singleFactor: " + singleFactor + ", multiFactor: " + multiFactor + 281 ", memoryFactor: " + memoryFactor); 282 283 this.cacheCapacity = capacity; 284 this.persistencePath = persistencePath; 285 this.blockSize = blockSize; 286 this.ioErrorsTolerationDuration = ioErrorsTolerationDuration; 287 288 bucketAllocator = new BucketAllocator(capacity, bucketSizes); 289 for (int i = 0; i < writerThreads.length; ++i) { 290 writerQueues.add(new ArrayBlockingQueue<>(writerQLen)); 291 } 292 293 assert writerQueues.size() == writerThreads.length; 294 this.ramCache = new ConcurrentHashMap<>(); 295 296 this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity); 297 298 if (ioEngine.isPersistent() && persistencePath != null) { 299 try { 300 retrieveFromFile(bucketSizes); 301 } catch (IOException ioex) { 302 LOG.error("Can't restore from file because of", ioex); 303 } catch (ClassNotFoundException cnfe) { 304 LOG.error("Can't restore from file in rebuild because can't deserialise",cnfe); 305 throw new RuntimeException(cnfe); 306 } 307 } 308 final String threadName = Thread.currentThread().getName(); 309 this.cacheEnabled = true; 310 for (int i = 0; i < writerThreads.length; ++i) { 311 writerThreads[i] = new WriterThread(writerQueues.get(i)); 312 writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i); 313 writerThreads[i].setDaemon(true); 314 } 315 startWriterThreads(); 316 317 // Run the statistics thread periodically to print the cache statistics log 318 // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log 319 // every five minutes. 320 this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), 321 statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS); 322 LOG.info("Started bucket cache; ioengine=" + ioEngineName + 323 ", capacity=" + StringUtils.byteDesc(capacity) + 324 ", blockSize=" + StringUtils.byteDesc(blockSize) + ", writerThreadNum=" + 325 writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath=" + 326 persistencePath + ", bucketAllocator=" + this.bucketAllocator.getClass().getName()); 327 } 328 329 private void sanityCheckConfigs() { 330 Preconditions.checkArgument(acceptableFactor <= 1 && acceptableFactor >= 0, ACCEPT_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 331 Preconditions.checkArgument(minFactor <= 1 && minFactor >= 0, MIN_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 332 Preconditions.checkArgument(minFactor <= acceptableFactor, MIN_FACTOR_CONFIG_NAME + " must be <= " + ACCEPT_FACTOR_CONFIG_NAME); 333 Preconditions.checkArgument(extraFreeFactor >= 0, EXTRA_FREE_FACTOR_CONFIG_NAME + " must be greater than 0.0"); 334 Preconditions.checkArgument(singleFactor <= 1 && singleFactor >= 0, SINGLE_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 335 Preconditions.checkArgument(multiFactor <= 1 && multiFactor >= 0, MULTI_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 336 Preconditions.checkArgument(memoryFactor <= 1 && memoryFactor >= 0, MEMORY_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 337 Preconditions.checkArgument((singleFactor + multiFactor + memoryFactor) == 1, SINGLE_FACTOR_CONFIG_NAME + ", " + 338 MULTI_FACTOR_CONFIG_NAME + ", and " + MEMORY_FACTOR_CONFIG_NAME + " segments must add up to 1.0"); 339 } 340 341 /** 342 * Called by the constructor to start the writer threads. Used by tests that need to override 343 * starting the threads. 344 */ 345 @VisibleForTesting 346 protected void startWriterThreads() { 347 for (WriterThread thread : writerThreads) { 348 thread.start(); 349 } 350 } 351 352 @VisibleForTesting 353 boolean isCacheEnabled() { 354 return this.cacheEnabled; 355 } 356 357 @Override 358 public long getMaxSize() { 359 return this.cacheCapacity; 360 } 361 362 public String getIoEngine() { 363 return ioEngine.toString(); 364 } 365 366 /** 367 * Get the IOEngine from the IO engine name 368 * @param ioEngineName 369 * @param capacity 370 * @param persistencePath 371 * @return the IOEngine 372 * @throws IOException 373 */ 374 private IOEngine getIOEngineFromName(String ioEngineName, long capacity, String persistencePath) 375 throws IOException { 376 if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) { 377 // In order to make the usage simple, we only need the prefix 'files:' in 378 // document whether one or multiple file(s), but also support 'file:' for 379 // the compatibility 380 String[] filePaths = ioEngineName.substring(ioEngineName.indexOf(":") + 1) 381 .split(FileIOEngine.FILE_DELIMITER); 382 return new FileIOEngine(capacity, persistencePath != null, filePaths); 383 } else if (ioEngineName.startsWith("offheap")) { 384 return new ByteBufferIOEngine(capacity); 385 } else if (ioEngineName.startsWith("mmap:")) { 386 return new FileMmapEngine(ioEngineName.substring(5), capacity); 387 } else { 388 throw new IllegalArgumentException( 389 "Don't understand io engine name for cache- prefix with file:, files:, mmap: or offheap"); 390 } 391 } 392 393 /** 394 * Cache the block with the specified name and buffer. 395 * @param cacheKey block's cache key 396 * @param buf block buffer 397 */ 398 @Override 399 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { 400 cacheBlock(cacheKey, buf, false); 401 } 402 403 /** 404 * Cache the block with the specified name and buffer. 405 * @param cacheKey block's cache key 406 * @param cachedItem block buffer 407 * @param inMemory if block is in-memory 408 */ 409 @Override 410 public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory) { 411 cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache); 412 } 413 414 /** 415 * Cache the block to ramCache 416 * @param cacheKey block's cache key 417 * @param cachedItem block buffer 418 * @param inMemory if block is in-memory 419 * @param wait if true, blocking wait when queue is full 420 */ 421 private void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, 422 boolean wait) { 423 if (cacheEnabled) { 424 if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) { 425 if (BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, cachedItem)) { 426 cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); 427 } 428 } else { 429 cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); 430 } 431 } 432 } 433 434 private void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem, 435 boolean inMemory, boolean wait) { 436 if (!cacheEnabled) { 437 return; 438 } 439 LOG.trace("Caching key={}, item={}", cacheKey, cachedItem); 440 // Stuff the entry into the RAM cache so it can get drained to the persistent store 441 RAMQueueEntry re = 442 new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory); 443 /** 444 * Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same 445 * key in ramCache, the heap size of bucket cache need to update if replacing entry from 446 * ramCache. But WriterThread will also remove entry from ramCache and update heap size, if 447 * using ramCache.put(), It's possible that the removed entry in WriterThread is not the correct 448 * one, then the heap size will mess up (HBASE-20789) 449 */ 450 if (ramCache.putIfAbsent(cacheKey, re) != null) { 451 return; 452 } 453 int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size(); 454 BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum); 455 boolean successfulAddition = false; 456 if (wait) { 457 try { 458 successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS); 459 } catch (InterruptedException e) { 460 Thread.currentThread().interrupt(); 461 } 462 } else { 463 successfulAddition = bq.offer(re); 464 } 465 if (!successfulAddition) { 466 ramCache.remove(cacheKey); 467 cacheStats.failInsert(); 468 } else { 469 this.blockNumber.increment(); 470 this.heapSize.add(cachedItem.heapSize()); 471 blocksByHFile.add(cacheKey); 472 } 473 } 474 475 /** 476 * Get the buffer of the block with the specified key. 477 * @param key block's cache key 478 * @param caching true if the caller caches blocks on cache misses 479 * @param repeat Whether this is a repeat lookup for the same block 480 * @param updateCacheMetrics Whether we should update cache metrics or not 481 * @return buffer of specified cache key, or null if not in cache 482 */ 483 @Override 484 public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, 485 boolean updateCacheMetrics) { 486 if (!cacheEnabled) { 487 return null; 488 } 489 RAMQueueEntry re = ramCache.get(key); 490 if (re != null) { 491 if (updateCacheMetrics) { 492 cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); 493 } 494 re.access(accessCount.incrementAndGet()); 495 return re.getData(); 496 } 497 BucketEntry bucketEntry = backingMap.get(key); 498 if (bucketEntry != null) { 499 long start = System.nanoTime(); 500 ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); 501 try { 502 lock.readLock().lock(); 503 // We can not read here even if backingMap does contain the given key because its offset 504 // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check 505 // existence here. 506 if (bucketEntry.equals(backingMap.get(key))) { 507 // TODO : change this area - should be removed after server cells and 508 // 12295 are available 509 int len = bucketEntry.getLength(); 510 if (LOG.isTraceEnabled()) { 511 LOG.trace("Read offset=" + bucketEntry.offset() + ", len=" + len); 512 } 513 Cacheable cachedBlock = ioEngine.read(bucketEntry.offset(), len, 514 bucketEntry.deserializerReference(this.deserialiserMap)); 515 long timeTaken = System.nanoTime() - start; 516 if (updateCacheMetrics) { 517 cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); 518 cacheStats.ioHit(timeTaken); 519 } 520 if (cachedBlock.getMemoryType() == MemoryType.SHARED) { 521 bucketEntry.incrementRefCountAndGet(); 522 } 523 bucketEntry.access(accessCount.incrementAndGet()); 524 if (this.ioErrorStartTime > 0) { 525 ioErrorStartTime = -1; 526 } 527 return cachedBlock; 528 } 529 } catch (IOException ioex) { 530 LOG.error("Failed reading block " + key + " from bucket cache", ioex); 531 checkIOErrorIsTolerated(); 532 } finally { 533 lock.readLock().unlock(); 534 } 535 } 536 if (!repeat && updateCacheMetrics) { 537 cacheStats.miss(caching, key.isPrimary(), key.getBlockType()); 538 } 539 return null; 540 } 541 542 @VisibleForTesting 543 void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber) { 544 bucketAllocator.freeBlock(bucketEntry.offset()); 545 realCacheSize.add(-1 * bucketEntry.getLength()); 546 blocksByHFile.remove(cacheKey); 547 if (decrementBlockNumber) { 548 this.blockNumber.decrement(); 549 } 550 } 551 552 @Override 553 public boolean evictBlock(BlockCacheKey cacheKey) { 554 return evictBlock(cacheKey, true); 555 } 556 557 private RAMQueueEntry checkRamCache(BlockCacheKey cacheKey) { 558 RAMQueueEntry removedBlock = ramCache.remove(cacheKey); 559 if (removedBlock != null) { 560 this.blockNumber.decrement(); 561 this.heapSize.add(-1 * removedBlock.getData().heapSize()); 562 } 563 return removedBlock; 564 } 565 566 public boolean evictBlock(BlockCacheKey cacheKey, boolean deletedBlock) { 567 if (!cacheEnabled) { 568 return false; 569 } 570 RAMQueueEntry removedBlock = checkRamCache(cacheKey); 571 BucketEntry bucketEntry = backingMap.get(cacheKey); 572 if (bucketEntry == null) { 573 if (removedBlock != null) { 574 cacheStats.evicted(0, cacheKey.isPrimary()); 575 return true; 576 } else { 577 return false; 578 } 579 } 580 ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); 581 try { 582 lock.writeLock().lock(); 583 int refCount = bucketEntry.getRefCount(); 584 if (refCount == 0) { 585 if (backingMap.remove(cacheKey, bucketEntry)) { 586 blockEvicted(cacheKey, bucketEntry, removedBlock == null); 587 } else { 588 return false; 589 } 590 } else { 591 if(!deletedBlock) { 592 if (LOG.isDebugEnabled()) { 593 LOG.debug("This block " + cacheKey + " is still referred by " + refCount 594 + " readers. Can not be freed now"); 595 } 596 return false; 597 } else { 598 if (LOG.isDebugEnabled()) { 599 LOG.debug("This block " + cacheKey + " is still referred by " + refCount 600 + " readers. Can not be freed now. Hence will mark this" 601 + " for evicting at a later point"); 602 } 603 bucketEntry.markForEvict(); 604 } 605 } 606 } finally { 607 lock.writeLock().unlock(); 608 } 609 cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary()); 610 return true; 611 } 612 613 /* 614 * Statistics thread. Periodically output cache statistics to the log. 615 */ 616 private static class StatisticsThread extends Thread { 617 private final BucketCache bucketCache; 618 619 public StatisticsThread(BucketCache bucketCache) { 620 super("BucketCacheStatsThread"); 621 setDaemon(true); 622 this.bucketCache = bucketCache; 623 } 624 625 @Override 626 public void run() { 627 bucketCache.logStats(); 628 } 629 } 630 631 public void logStats() { 632 long totalSize = bucketAllocator.getTotalSize(); 633 long usedSize = bucketAllocator.getUsedSize(); 634 long freeSize = totalSize - usedSize; 635 long cacheSize = getRealCacheSize(); 636 LOG.info("failedBlockAdditions=" + cacheStats.getFailedInserts() + ", " + 637 "totalSize=" + StringUtils.byteDesc(totalSize) + ", " + 638 "freeSize=" + StringUtils.byteDesc(freeSize) + ", " + 639 "usedSize=" + StringUtils.byteDesc(usedSize) +", " + 640 "cacheSize=" + StringUtils.byteDesc(cacheSize) +", " + 641 "accesses=" + cacheStats.getRequestCount() + ", " + 642 "hits=" + cacheStats.getHitCount() + ", " + 643 "IOhitsPerSecond=" + cacheStats.getIOHitsPerSecond() + ", " + 644 "IOTimePerHit=" + String.format("%.2f", cacheStats.getIOTimePerHit())+ ", " + 645 "hitRatio=" + (cacheStats.getHitCount() == 0 ? "0," : 646 (StringUtils.formatPercent(cacheStats.getHitRatio(), 2)+ ", ")) + 647 "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " + 648 "cachingHits=" + cacheStats.getHitCachingCount() + ", " + 649 "cachingHitsRatio=" +(cacheStats.getHitCachingCount() == 0 ? "0," : 650 (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2)+ ", ")) + 651 "evictions=" + cacheStats.getEvictionCount() + ", " + 652 "evicted=" + cacheStats.getEvictedCount() + ", " + 653 "evictedPerRun=" + cacheStats.evictedPerEviction()); 654 cacheStats.reset(); 655 } 656 657 public long getRealCacheSize() { 658 return this.realCacheSize.sum(); 659 } 660 661 private long acceptableSize() { 662 return (long) Math.floor(bucketAllocator.getTotalSize() * acceptableFactor); 663 } 664 665 @VisibleForTesting 666 long getPartitionSize(float partitionFactor) { 667 return (long) Math.floor(bucketAllocator.getTotalSize() * partitionFactor * minFactor); 668 } 669 670 /** 671 * Return the count of bucketSizeinfos still need free space 672 */ 673 private int bucketSizesAboveThresholdCount(float minFactor) { 674 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); 675 int fullCount = 0; 676 for (int i = 0; i < stats.length; i++) { 677 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor)); 678 freeGoal = Math.max(freeGoal, 1); 679 if (stats[i].freeCount() < freeGoal) { 680 fullCount++; 681 } 682 } 683 return fullCount; 684 } 685 686 /** 687 * This method will find the buckets that are minimally occupied 688 * and are not reference counted and will free them completely 689 * without any constraint on the access times of the elements, 690 * and as a process will completely free at most the number of buckets 691 * passed, sometimes it might not due to changing refCounts 692 * 693 * @param completelyFreeBucketsNeeded number of buckets to free 694 **/ 695 private void freeEntireBuckets(int completelyFreeBucketsNeeded) { 696 if (completelyFreeBucketsNeeded != 0) { 697 // First we will build a set where the offsets are reference counted, usually 698 // this set is small around O(Handler Count) unless something else is wrong 699 Set<Integer> inUseBuckets = new HashSet<Integer>(); 700 for (BucketEntry entry : backingMap.values()) { 701 if (entry.getRefCount() != 0) { 702 inUseBuckets.add(bucketAllocator.getBucketIndex(entry.offset())); 703 } 704 } 705 706 Set<Integer> candidateBuckets = bucketAllocator.getLeastFilledBuckets( 707 inUseBuckets, completelyFreeBucketsNeeded); 708 for (Map.Entry<BlockCacheKey, BucketEntry> entry : backingMap.entrySet()) { 709 if (candidateBuckets.contains(bucketAllocator 710 .getBucketIndex(entry.getValue().offset()))) { 711 evictBlock(entry.getKey(), false); 712 } 713 } 714 } 715 } 716 717 /** 718 * Free the space if the used size reaches acceptableSize() or one size block 719 * couldn't be allocated. When freeing the space, we use the LRU algorithm and 720 * ensure there must be some blocks evicted 721 * @param why Why we are being called 722 */ 723 private void freeSpace(final String why) { 724 // Ensure only one freeSpace progress at a time 725 if (!freeSpaceLock.tryLock()) { 726 return; 727 } 728 try { 729 freeInProgress = true; 730 long bytesToFreeWithoutExtra = 0; 731 // Calculate free byte for each bucketSizeinfo 732 StringBuilder msgBuffer = LOG.isDebugEnabled()? new StringBuilder(): null; 733 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); 734 long[] bytesToFreeForBucket = new long[stats.length]; 735 for (int i = 0; i < stats.length; i++) { 736 bytesToFreeForBucket[i] = 0; 737 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor)); 738 freeGoal = Math.max(freeGoal, 1); 739 if (stats[i].freeCount() < freeGoal) { 740 bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount()); 741 bytesToFreeWithoutExtra += bytesToFreeForBucket[i]; 742 if (msgBuffer != null) { 743 msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")=" 744 + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", "); 745 } 746 } 747 } 748 if (msgBuffer != null) { 749 msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", "); 750 } 751 752 if (bytesToFreeWithoutExtra <= 0) { 753 return; 754 } 755 long currentSize = bucketAllocator.getUsedSize(); 756 long totalSize = bucketAllocator.getTotalSize(); 757 if (LOG.isDebugEnabled() && msgBuffer != null) { 758 LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString() + 759 " of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" + 760 StringUtils.byteDesc(realCacheSize.sum()) + ", total=" + StringUtils.byteDesc(totalSize)); 761 } 762 763 long bytesToFreeWithExtra = (long) Math.floor(bytesToFreeWithoutExtra 764 * (1 + extraFreeFactor)); 765 766 // Instantiate priority buckets 767 BucketEntryGroup bucketSingle = new BucketEntryGroup(bytesToFreeWithExtra, 768 blockSize, getPartitionSize(singleFactor)); 769 BucketEntryGroup bucketMulti = new BucketEntryGroup(bytesToFreeWithExtra, 770 blockSize, getPartitionSize(multiFactor)); 771 BucketEntryGroup bucketMemory = new BucketEntryGroup(bytesToFreeWithExtra, 772 blockSize, getPartitionSize(memoryFactor)); 773 774 // Scan entire map putting bucket entry into appropriate bucket entry 775 // group 776 for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) { 777 switch (bucketEntryWithKey.getValue().getPriority()) { 778 case SINGLE: { 779 bucketSingle.add(bucketEntryWithKey); 780 break; 781 } 782 case MULTI: { 783 bucketMulti.add(bucketEntryWithKey); 784 break; 785 } 786 case MEMORY: { 787 bucketMemory.add(bucketEntryWithKey); 788 break; 789 } 790 } 791 } 792 793 PriorityQueue<BucketEntryGroup> bucketQueue = new PriorityQueue<>(3, 794 Comparator.comparingLong(BucketEntryGroup::overflow)); 795 796 bucketQueue.add(bucketSingle); 797 bucketQueue.add(bucketMulti); 798 bucketQueue.add(bucketMemory); 799 800 int remainingBuckets = bucketQueue.size(); 801 long bytesFreed = 0; 802 803 BucketEntryGroup bucketGroup; 804 while ((bucketGroup = bucketQueue.poll()) != null) { 805 long overflow = bucketGroup.overflow(); 806 if (overflow > 0) { 807 long bucketBytesToFree = Math.min(overflow, 808 (bytesToFreeWithoutExtra - bytesFreed) / remainingBuckets); 809 bytesFreed += bucketGroup.free(bucketBytesToFree); 810 } 811 remainingBuckets--; 812 } 813 814 // Check and free if there are buckets that still need freeing of space 815 if (bucketSizesAboveThresholdCount(minFactor) > 0) { 816 bucketQueue.clear(); 817 remainingBuckets = 3; 818 819 bucketQueue.add(bucketSingle); 820 bucketQueue.add(bucketMulti); 821 bucketQueue.add(bucketMemory); 822 823 while ((bucketGroup = bucketQueue.poll()) != null) { 824 long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets; 825 bytesFreed += bucketGroup.free(bucketBytesToFree); 826 remainingBuckets--; 827 } 828 } 829 830 // Even after the above free we might still need freeing because of the 831 // De-fragmentation of the buckets (also called Slab Calcification problem), i.e 832 // there might be some buckets where the occupancy is very sparse and thus are not 833 // yielding the free for the other bucket sizes, the fix for this to evict some 834 // of the buckets, we do this by evicting the buckets that are least fulled 835 freeEntireBuckets(DEFAULT_FREE_ENTIRE_BLOCK_FACTOR * 836 bucketSizesAboveThresholdCount(1.0f)); 837 838 if (LOG.isDebugEnabled()) { 839 long single = bucketSingle.totalSize(); 840 long multi = bucketMulti.totalSize(); 841 long memory = bucketMemory.totalSize(); 842 if (LOG.isDebugEnabled()) { 843 LOG.debug("Bucket cache free space completed; " + "freed=" 844 + StringUtils.byteDesc(bytesFreed) + ", " + "total=" 845 + StringUtils.byteDesc(totalSize) + ", " + "single=" 846 + StringUtils.byteDesc(single) + ", " + "multi=" 847 + StringUtils.byteDesc(multi) + ", " + "memory=" 848 + StringUtils.byteDesc(memory)); 849 } 850 } 851 852 } catch (Throwable t) { 853 LOG.warn("Failed freeing space", t); 854 } finally { 855 cacheStats.evict(); 856 freeInProgress = false; 857 freeSpaceLock.unlock(); 858 } 859 } 860 861 // This handles flushing the RAM cache to IOEngine. 862 @VisibleForTesting 863 class WriterThread extends HasThread { 864 private final BlockingQueue<RAMQueueEntry> inputQueue; 865 private volatile boolean writerEnabled = true; 866 867 WriterThread(BlockingQueue<RAMQueueEntry> queue) { 868 super("BucketCacheWriterThread"); 869 this.inputQueue = queue; 870 } 871 872 // Used for test 873 @VisibleForTesting 874 void disableWriter() { 875 this.writerEnabled = false; 876 } 877 878 @Override 879 public void run() { 880 List<RAMQueueEntry> entries = new ArrayList<>(); 881 try { 882 while (cacheEnabled && writerEnabled) { 883 try { 884 try { 885 // Blocks 886 entries = getRAMQueueEntries(inputQueue, entries); 887 } catch (InterruptedException ie) { 888 if (!cacheEnabled) break; 889 } 890 doDrain(entries); 891 } catch (Exception ioe) { 892 LOG.error("WriterThread encountered error", ioe); 893 } 894 } 895 } catch (Throwable t) { 896 LOG.warn("Failed doing drain", t); 897 } 898 LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled); 899 } 900 901 /** 902 * Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing 903 * cache with a new block for the same cache key. there's a corner case: one thread cache a 904 * block in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another 905 * new block with the same cache key do the same thing for the same cache key, so if not evict 906 * the previous bucket entry, then memory leak happen because the previous bucketEntry is gone 907 * but the bucketAllocator do not free its memory. 908 * @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey 909 * cacheKey, Cacheable newBlock) 910 * @param key Block cache key 911 * @param bucketEntry Bucket entry to put into backingMap. 912 */ 913 private void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) { 914 BucketEntry previousEntry = backingMap.put(key, bucketEntry); 915 if (previousEntry != null && previousEntry != bucketEntry) { 916 ReentrantReadWriteLock lock = offsetLock.getLock(previousEntry.offset()); 917 lock.writeLock().lock(); 918 try { 919 blockEvicted(key, previousEntry, false); 920 } finally { 921 lock.writeLock().unlock(); 922 } 923 } 924 } 925 926 /** 927 * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap. 928 * Process all that are passed in even if failure being sure to remove from ramCache else we'll 929 * never undo the references and we'll OOME. 930 * @param entries Presumes list passed in here will be processed by this invocation only. No 931 * interference expected. 932 * @throws InterruptedException 933 */ 934 @VisibleForTesting 935 void doDrain(final List<RAMQueueEntry> entries) throws InterruptedException { 936 if (entries.isEmpty()) { 937 return; 938 } 939 // This method is a little hard to follow. We run through the passed in entries and for each 940 // successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must 941 // do cleanup making sure we've cleared ramCache of all entries regardless of whether we 942 // successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by 943 // filling ramCache. We do the clean up by again running through the passed in entries 944 // doing extra work when we find a non-null bucketEntries corresponding entry. 945 final int size = entries.size(); 946 BucketEntry[] bucketEntries = new BucketEntry[size]; 947 // Index updated inside loop if success or if we can't succeed. We retry if cache is full 948 // when we go to add an entry by going around the loop again without upping the index. 949 int index = 0; 950 while (cacheEnabled && index < size) { 951 RAMQueueEntry re = null; 952 try { 953 re = entries.get(index); 954 if (re == null) { 955 LOG.warn("Couldn't get entry or changed on us; who else is messing with it?"); 956 index++; 957 continue; 958 } 959 BucketEntry bucketEntry = 960 re.writeToCache(ioEngine, bucketAllocator, deserialiserMap, realCacheSize); 961 // Successfully added. Up index and add bucketEntry. Clear io exceptions. 962 bucketEntries[index] = bucketEntry; 963 if (ioErrorStartTime > 0) { 964 ioErrorStartTime = -1; 965 } 966 index++; 967 } catch (BucketAllocatorException fle) { 968 LOG.warn("Failed allocation for " + (re == null ? "" : re.getKey()) + "; " + fle); 969 // Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below. 970 bucketEntries[index] = null; 971 index++; 972 } catch (CacheFullException cfe) { 973 // Cache full when we tried to add. Try freeing space and then retrying (don't up index) 974 if (!freeInProgress) { 975 freeSpace("Full!"); 976 } else { 977 Thread.sleep(50); 978 } 979 } catch (IOException ioex) { 980 // Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem. 981 LOG.error("Failed writing to bucket cache", ioex); 982 checkIOErrorIsTolerated(); 983 } 984 } 985 986 // Make sure data pages are written on media before we update maps. 987 try { 988 ioEngine.sync(); 989 } catch (IOException ioex) { 990 LOG.error("Failed syncing IO engine", ioex); 991 checkIOErrorIsTolerated(); 992 // Since we failed sync, free the blocks in bucket allocator 993 for (int i = 0; i < entries.size(); ++i) { 994 if (bucketEntries[i] != null) { 995 bucketAllocator.freeBlock(bucketEntries[i].offset()); 996 bucketEntries[i] = null; 997 } 998 } 999 } 1000 1001 // Now add to backingMap if successfully added to bucket cache. Remove from ramCache if 1002 // success or error. 1003 for (int i = 0; i < size; ++i) { 1004 BlockCacheKey key = entries.get(i).getKey(); 1005 // Only add if non-null entry. 1006 if (bucketEntries[i] != null) { 1007 putIntoBackingMap(key, bucketEntries[i]); 1008 } 1009 // Always remove from ramCache even if we failed adding it to the block cache above. 1010 RAMQueueEntry ramCacheEntry = ramCache.remove(key); 1011 if (ramCacheEntry != null) { 1012 heapSize.add(-1 * entries.get(i).getData().heapSize()); 1013 } else if (bucketEntries[i] != null){ 1014 // Block should have already been evicted. Remove it and free space. 1015 ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntries[i].offset()); 1016 try { 1017 lock.writeLock().lock(); 1018 int refCount = bucketEntries[i].getRefCount(); 1019 if (refCount == 0) { 1020 if (backingMap.remove(key, bucketEntries[i])) { 1021 blockEvicted(key, bucketEntries[i], false); 1022 } else { 1023 bucketEntries[i].markForEvict(); 1024 } 1025 } else { 1026 bucketEntries[i].markForEvict(); 1027 } 1028 } finally { 1029 lock.writeLock().unlock(); 1030 } 1031 } 1032 } 1033 1034 long used = bucketAllocator.getUsedSize(); 1035 if (used > acceptableSize()) { 1036 freeSpace("Used=" + used + " > acceptable=" + acceptableSize()); 1037 } 1038 return; 1039 } 1040 } 1041 1042 /** 1043 * Blocks until elements available in {@code q} then tries to grab as many as possible 1044 * before returning. 1045 * @param receptacle Where to stash the elements taken from queue. We clear before we use it 1046 * just in case. 1047 * @param q The queue to take from. 1048 * @return {@code receptacle} laden with elements taken from the queue or empty if none found. 1049 */ 1050 @VisibleForTesting 1051 static List<RAMQueueEntry> getRAMQueueEntries(final BlockingQueue<RAMQueueEntry> q, 1052 final List<RAMQueueEntry> receptacle) 1053 throws InterruptedException { 1054 // Clear sets all entries to null and sets size to 0. We retain allocations. Presume it 1055 // ok even if list grew to accommodate thousands. 1056 receptacle.clear(); 1057 receptacle.add(q.take()); 1058 q.drainTo(receptacle); 1059 return receptacle; 1060 } 1061 1062 private void persistToFile() throws IOException { 1063 assert !cacheEnabled; 1064 FileOutputStream fos = null; 1065 ObjectOutputStream oos = null; 1066 try { 1067 if (!ioEngine.isPersistent()) { 1068 throw new IOException("Attempt to persist non-persistent cache mappings!"); 1069 } 1070 fos = new FileOutputStream(persistencePath, false); 1071 oos = new ObjectOutputStream(fos); 1072 oos.writeLong(cacheCapacity); 1073 oos.writeUTF(ioEngine.getClass().getName()); 1074 oos.writeUTF(backingMap.getClass().getName()); 1075 oos.writeObject(deserialiserMap); 1076 oos.writeObject(backingMap); 1077 } finally { 1078 if (oos != null) oos.close(); 1079 if (fos != null) fos.close(); 1080 } 1081 } 1082 1083 @SuppressWarnings("unchecked") 1084 private void retrieveFromFile(int[] bucketSizes) throws IOException, BucketAllocatorException, 1085 ClassNotFoundException { 1086 File persistenceFile = new File(persistencePath); 1087 if (!persistenceFile.exists()) { 1088 return; 1089 } 1090 assert !cacheEnabled; 1091 FileInputStream fis = null; 1092 ObjectInputStream ois = null; 1093 try { 1094 if (!ioEngine.isPersistent()) 1095 throw new IOException( 1096 "Attempt to restore non-persistent cache mappings!"); 1097 fis = new FileInputStream(persistencePath); 1098 ois = new ObjectInputStream(fis); 1099 long capacitySize = ois.readLong(); 1100 if (capacitySize != cacheCapacity) 1101 throw new IOException("Mismatched cache capacity:" 1102 + StringUtils.byteDesc(capacitySize) + ", expected: " 1103 + StringUtils.byteDesc(cacheCapacity)); 1104 String ioclass = ois.readUTF(); 1105 String mapclass = ois.readUTF(); 1106 if (!ioEngine.getClass().getName().equals(ioclass)) 1107 throw new IOException("Class name for IO engine mismatch: " + ioclass 1108 + ", expected:" + ioEngine.getClass().getName()); 1109 if (!backingMap.getClass().getName().equals(mapclass)) 1110 throw new IOException("Class name for cache map mismatch: " + mapclass 1111 + ", expected:" + backingMap.getClass().getName()); 1112 UniqueIndexMap<Integer> deserMap = (UniqueIndexMap<Integer>) ois 1113 .readObject(); 1114 ConcurrentHashMap<BlockCacheKey, BucketEntry> backingMapFromFile = 1115 (ConcurrentHashMap<BlockCacheKey, BucketEntry>) ois.readObject(); 1116 BucketAllocator allocator = new BucketAllocator(cacheCapacity, bucketSizes, 1117 backingMapFromFile, realCacheSize); 1118 bucketAllocator = allocator; 1119 deserialiserMap = deserMap; 1120 backingMap = backingMapFromFile; 1121 } finally { 1122 if (ois != null) ois.close(); 1123 if (fis != null) fis.close(); 1124 if (!persistenceFile.delete()) { 1125 throw new IOException("Failed deleting persistence file " 1126 + persistenceFile.getAbsolutePath()); 1127 } 1128 } 1129 } 1130 1131 /** 1132 * Check whether we tolerate IO error this time. If the duration of IOEngine 1133 * throwing errors exceeds ioErrorsDurationTimeTolerated, we will disable the 1134 * cache 1135 */ 1136 private void checkIOErrorIsTolerated() { 1137 long now = EnvironmentEdgeManager.currentTime(); 1138 if (this.ioErrorStartTime > 0) { 1139 if (cacheEnabled && (now - ioErrorStartTime) > this.ioErrorsTolerationDuration) { 1140 LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration + 1141 "ms, disabling cache, please check your IOEngine"); 1142 disableCache(); 1143 } 1144 } else { 1145 this.ioErrorStartTime = now; 1146 } 1147 } 1148 1149 /** 1150 * Used to shut down the cache -or- turn it off in the case of something broken. 1151 */ 1152 private void disableCache() { 1153 if (!cacheEnabled) return; 1154 cacheEnabled = false; 1155 ioEngine.shutdown(); 1156 this.scheduleThreadPool.shutdown(); 1157 for (int i = 0; i < writerThreads.length; ++i) writerThreads[i].interrupt(); 1158 this.ramCache.clear(); 1159 if (!ioEngine.isPersistent() || persistencePath == null) { 1160 // If persistent ioengine and a path, we will serialize out the backingMap. 1161 this.backingMap.clear(); 1162 } 1163 } 1164 1165 private void join() throws InterruptedException { 1166 for (int i = 0; i < writerThreads.length; ++i) 1167 writerThreads[i].join(); 1168 } 1169 1170 @Override 1171 public void shutdown() { 1172 disableCache(); 1173 LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent() 1174 + "; path to write=" + persistencePath); 1175 if (ioEngine.isPersistent() && persistencePath != null) { 1176 try { 1177 join(); 1178 persistToFile(); 1179 } catch (IOException ex) { 1180 LOG.error("Unable to persist data on exit: " + ex.toString(), ex); 1181 } catch (InterruptedException e) { 1182 LOG.warn("Failed to persist data on exit", e); 1183 } 1184 } 1185 } 1186 1187 @Override 1188 public CacheStats getStats() { 1189 return cacheStats; 1190 } 1191 1192 public BucketAllocator getAllocator() { 1193 return this.bucketAllocator; 1194 } 1195 1196 @Override 1197 public long heapSize() { 1198 return this.heapSize.sum(); 1199 } 1200 1201 @Override 1202 public long size() { 1203 return this.realCacheSize.sum(); 1204 } 1205 1206 @Override 1207 public long getCurrentDataSize() { 1208 return size(); 1209 } 1210 1211 @Override 1212 public long getFreeSize() { 1213 return this.bucketAllocator.getFreeSize(); 1214 } 1215 1216 @Override 1217 public long getBlockCount() { 1218 return this.blockNumber.sum(); 1219 } 1220 1221 @Override 1222 public long getDataBlockCount() { 1223 return getBlockCount(); 1224 } 1225 1226 @Override 1227 public long getCurrentSize() { 1228 return this.bucketAllocator.getUsedSize(); 1229 } 1230 1231 /** 1232 * Evicts all blocks for a specific HFile. 1233 * <p> 1234 * This is used for evict-on-close to remove all blocks of a specific HFile. 1235 * 1236 * @return the number of blocks evicted 1237 */ 1238 @Override 1239 public int evictBlocksByHfileName(String hfileName) { 1240 Set<BlockCacheKey> keySet = blocksByHFile.subSet( 1241 new BlockCacheKey(hfileName, Long.MIN_VALUE), true, 1242 new BlockCacheKey(hfileName, Long.MAX_VALUE), true); 1243 1244 int numEvicted = 0; 1245 for (BlockCacheKey key : keySet) { 1246 if (evictBlock(key)) { 1247 ++numEvicted; 1248 } 1249 } 1250 1251 return numEvicted; 1252 } 1253 1254 /** 1255 * Item in cache. We expect this to be where most memory goes. Java uses 8 1256 * bytes just for object headers; after this, we want to use as little as 1257 * possible - so we only use 8 bytes, but in order to do so we end up messing 1258 * around with all this Java casting stuff. Offset stored as 5 bytes that make 1259 * up the long. Doubt we'll see devices this big for ages. Offsets are divided 1260 * by 256. So 5 bytes gives us 256TB or so. 1261 */ 1262 static class BucketEntry implements Serializable { 1263 private static final long serialVersionUID = -6741504807982257534L; 1264 1265 // access counter comparator, descending order 1266 static final Comparator<BucketEntry> COMPARATOR = new Comparator<BucketCache.BucketEntry>() { 1267 1268 @Override 1269 public int compare(BucketEntry o1, BucketEntry o2) { 1270 return Long.compare(o2.accessCounter, o1.accessCounter); 1271 } 1272 }; 1273 1274 private int offsetBase; 1275 private int length; 1276 private byte offset1; 1277 byte deserialiserIndex; 1278 private volatile long accessCounter; 1279 private BlockPriority priority; 1280 1281 /** 1282 * Time this block was cached. Presumes we are created just before we are added to the cache. 1283 */ 1284 private final long cachedTime = System.nanoTime(); 1285 1286 BucketEntry(long offset, int length, long accessCounter, boolean inMemory) { 1287 setOffset(offset); 1288 this.length = length; 1289 this.accessCounter = accessCounter; 1290 if (inMemory) { 1291 this.priority = BlockPriority.MEMORY; 1292 } else { 1293 this.priority = BlockPriority.SINGLE; 1294 } 1295 } 1296 1297 long offset() { // Java has no unsigned numbers 1298 long o = ((long) offsetBase) & 0xFFFFFFFFL; //This needs the L cast otherwise it will be sign extended as a negative number. 1299 o += (((long) (offset1)) & 0xFF) << 32; //The 0xFF here does not need the L cast because it is treated as a positive int. 1300 return o << 8; 1301 } 1302 1303 private void setOffset(long value) { 1304 assert (value & 0xFF) == 0; 1305 value >>= 8; 1306 offsetBase = (int) value; 1307 offset1 = (byte) (value >> 32); 1308 } 1309 1310 public int getLength() { 1311 return length; 1312 } 1313 1314 protected CacheableDeserializer<Cacheable> deserializerReference( 1315 UniqueIndexMap<Integer> deserialiserMap) { 1316 return CacheableDeserializerIdManager.getDeserializer(deserialiserMap 1317 .unmap(deserialiserIndex)); 1318 } 1319 1320 protected void setDeserialiserReference( 1321 CacheableDeserializer<Cacheable> deserializer, 1322 UniqueIndexMap<Integer> deserialiserMap) { 1323 this.deserialiserIndex = ((byte) deserialiserMap.map(deserializer 1324 .getDeserialiserIdentifier())); 1325 } 1326 1327 /** 1328 * Block has been accessed. Update its local access counter. 1329 */ 1330 public void access(long accessCounter) { 1331 this.accessCounter = accessCounter; 1332 if (this.priority == BlockPriority.SINGLE) { 1333 this.priority = BlockPriority.MULTI; 1334 } 1335 } 1336 1337 public BlockPriority getPriority() { 1338 return this.priority; 1339 } 1340 1341 public long getCachedTime() { 1342 return cachedTime; 1343 } 1344 1345 protected int getRefCount() { 1346 return 0; 1347 } 1348 1349 protected int incrementRefCountAndGet() { 1350 return 0; 1351 } 1352 1353 protected int decrementRefCountAndGet() { 1354 return 0; 1355 } 1356 1357 protected boolean isMarkedForEvict() { 1358 return false; 1359 } 1360 1361 protected void markForEvict() { 1362 // noop; 1363 } 1364 } 1365 1366 static class SharedMemoryBucketEntry extends BucketEntry { 1367 private static final long serialVersionUID = -2187147283772338481L; 1368 1369 // Set this when we were not able to forcefully evict the block 1370 private volatile boolean markedForEvict; 1371 private AtomicInteger refCount = new AtomicInteger(0); 1372 1373 SharedMemoryBucketEntry(long offset, int length, long accessCounter, boolean inMemory) { 1374 super(offset, length, accessCounter, inMemory); 1375 } 1376 1377 @Override 1378 protected int getRefCount() { 1379 return this.refCount.get(); 1380 } 1381 1382 @Override 1383 protected int incrementRefCountAndGet() { 1384 return this.refCount.incrementAndGet(); 1385 } 1386 1387 @Override 1388 protected int decrementRefCountAndGet() { 1389 return this.refCount.decrementAndGet(); 1390 } 1391 1392 @Override 1393 protected boolean isMarkedForEvict() { 1394 return this.markedForEvict; 1395 } 1396 1397 @Override 1398 protected void markForEvict() { 1399 this.markedForEvict = true; 1400 } 1401 } 1402 1403 /** 1404 * Used to group bucket entries into priority buckets. There will be a 1405 * BucketEntryGroup for each priority (single, multi, memory). Once bucketed, 1406 * the eviction algorithm takes the appropriate number of elements out of each 1407 * according to configuration parameters and their relative sizes. 1408 */ 1409 private class BucketEntryGroup { 1410 1411 private CachedEntryQueue queue; 1412 private long totalSize = 0; 1413 private long bucketSize; 1414 1415 public BucketEntryGroup(long bytesToFree, long blockSize, long bucketSize) { 1416 this.bucketSize = bucketSize; 1417 queue = new CachedEntryQueue(bytesToFree, blockSize); 1418 totalSize = 0; 1419 } 1420 1421 public void add(Map.Entry<BlockCacheKey, BucketEntry> block) { 1422 totalSize += block.getValue().getLength(); 1423 queue.add(block); 1424 } 1425 1426 public long free(long toFree) { 1427 Map.Entry<BlockCacheKey, BucketEntry> entry; 1428 long freedBytes = 0; 1429 // TODO avoid a cycling siutation. We find no block which is not in use and so no way to free 1430 // What to do then? Caching attempt fail? Need some changes in cacheBlock API? 1431 while ((entry = queue.pollLast()) != null) { 1432 if (evictBlock(entry.getKey(), false)) { 1433 freedBytes += entry.getValue().getLength(); 1434 } 1435 if (freedBytes >= toFree) { 1436 return freedBytes; 1437 } 1438 } 1439 return freedBytes; 1440 } 1441 1442 public long overflow() { 1443 return totalSize - bucketSize; 1444 } 1445 1446 public long totalSize() { 1447 return totalSize; 1448 } 1449 } 1450 1451 /** 1452 * Block Entry stored in the memory with key,data and so on 1453 */ 1454 @VisibleForTesting 1455 static class RAMQueueEntry { 1456 private BlockCacheKey key; 1457 private Cacheable data; 1458 private long accessCounter; 1459 private boolean inMemory; 1460 1461 public RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, 1462 boolean inMemory) { 1463 this.key = bck; 1464 this.data = data; 1465 this.accessCounter = accessCounter; 1466 this.inMemory = inMemory; 1467 } 1468 1469 public Cacheable getData() { 1470 return data; 1471 } 1472 1473 public BlockCacheKey getKey() { 1474 return key; 1475 } 1476 1477 public void access(long accessCounter) { 1478 this.accessCounter = accessCounter; 1479 } 1480 1481 private BucketEntry getBucketEntry(IOEngine ioEngine, long offset, int len) { 1482 if (ioEngine.usesSharedMemory()) { 1483 if (UnsafeAvailChecker.isAvailable()) { 1484 return new UnsafeSharedMemoryBucketEntry(offset, len, accessCounter, inMemory); 1485 } else { 1486 return new SharedMemoryBucketEntry(offset, len, accessCounter, inMemory); 1487 } 1488 } else { 1489 return new BucketEntry(offset, len, accessCounter, inMemory); 1490 } 1491 } 1492 1493 public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator bucketAllocator, 1494 final UniqueIndexMap<Integer> deserialiserMap, final LongAdder realCacheSize) 1495 throws IOException { 1496 int len = data.getSerializedLength(); 1497 // This cacheable thing can't be serialized 1498 if (len == 0) { 1499 return null; 1500 } 1501 long offset = bucketAllocator.allocateBlock(len); 1502 boolean succ = false; 1503 BucketEntry bucketEntry; 1504 try { 1505 bucketEntry = getBucketEntry(ioEngine, offset, len); 1506 bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap); 1507 if (data instanceof HFileBlock) { 1508 // If an instance of HFileBlock, save on some allocations. 1509 HFileBlock block = (HFileBlock) data; 1510 ByteBuff sliceBuf = block.getBufferReadOnly(); 1511 ByteBuffer metadata = block.getMetaData(); 1512 ioEngine.write(sliceBuf, offset); 1513 ioEngine.write(metadata, offset + len - metadata.limit()); 1514 } else { 1515 ByteBuffer bb = ByteBuffer.allocate(len); 1516 data.serialize(bb, true); 1517 ioEngine.write(bb, offset); 1518 } 1519 succ = true; 1520 } finally { 1521 if (!succ) { 1522 bucketAllocator.freeBlock(offset); 1523 } 1524 } 1525 realCacheSize.add(len); 1526 return bucketEntry; 1527 } 1528 } 1529 1530 /** 1531 * Only used in test 1532 * @throws InterruptedException 1533 */ 1534 void stopWriterThreads() throws InterruptedException { 1535 for (WriterThread writerThread : writerThreads) { 1536 writerThread.disableWriter(); 1537 writerThread.interrupt(); 1538 writerThread.join(); 1539 } 1540 } 1541 1542 @Override 1543 public Iterator<CachedBlock> iterator() { 1544 // Don't bother with ramcache since stuff is in here only a little while. 1545 final Iterator<Map.Entry<BlockCacheKey, BucketEntry>> i = 1546 this.backingMap.entrySet().iterator(); 1547 return new Iterator<CachedBlock>() { 1548 private final long now = System.nanoTime(); 1549 1550 @Override 1551 public boolean hasNext() { 1552 return i.hasNext(); 1553 } 1554 1555 @Override 1556 public CachedBlock next() { 1557 final Map.Entry<BlockCacheKey, BucketEntry> e = i.next(); 1558 return new CachedBlock() { 1559 @Override 1560 public String toString() { 1561 return BlockCacheUtil.toString(this, now); 1562 } 1563 1564 @Override 1565 public BlockPriority getBlockPriority() { 1566 return e.getValue().getPriority(); 1567 } 1568 1569 @Override 1570 public BlockType getBlockType() { 1571 // Not held by BucketEntry. Could add it if wanted on BucketEntry creation. 1572 return null; 1573 } 1574 1575 @Override 1576 public long getOffset() { 1577 return e.getKey().getOffset(); 1578 } 1579 1580 @Override 1581 public long getSize() { 1582 return e.getValue().getLength(); 1583 } 1584 1585 @Override 1586 public long getCachedTime() { 1587 return e.getValue().getCachedTime(); 1588 } 1589 1590 @Override 1591 public String getFilename() { 1592 return e.getKey().getHfileName(); 1593 } 1594 1595 @Override 1596 public int compareTo(CachedBlock other) { 1597 int diff = this.getFilename().compareTo(other.getFilename()); 1598 if (diff != 0) return diff; 1599 1600 diff = Long.compare(this.getOffset(), other.getOffset()); 1601 if (diff != 0) return diff; 1602 if (other.getCachedTime() < 0 || this.getCachedTime() < 0) { 1603 throw new IllegalStateException("" + this.getCachedTime() + ", " + 1604 other.getCachedTime()); 1605 } 1606 return Long.compare(other.getCachedTime(), this.getCachedTime()); 1607 } 1608 1609 @Override 1610 public int hashCode() { 1611 return e.getKey().hashCode(); 1612 } 1613 1614 @Override 1615 public boolean equals(Object obj) { 1616 if (obj instanceof CachedBlock) { 1617 CachedBlock cb = (CachedBlock)obj; 1618 return compareTo(cb) == 0; 1619 } else { 1620 return false; 1621 } 1622 } 1623 }; 1624 } 1625 1626 @Override 1627 public void remove() { 1628 throw new UnsupportedOperationException(); 1629 } 1630 }; 1631 } 1632 1633 @Override 1634 public BlockCache[] getBlockCaches() { 1635 return null; 1636 } 1637 1638 @Override 1639 public void returnBlock(BlockCacheKey cacheKey, Cacheable block) { 1640 if (block.getMemoryType() == MemoryType.SHARED) { 1641 BucketEntry bucketEntry = backingMap.get(cacheKey); 1642 if (bucketEntry != null) { 1643 int refCount = bucketEntry.decrementRefCountAndGet(); 1644 if (refCount == 0 && bucketEntry.isMarkedForEvict()) { 1645 evictBlock(cacheKey); 1646 } 1647 } 1648 } 1649 } 1650 1651 @VisibleForTesting 1652 public int getRefCount(BlockCacheKey cacheKey) { 1653 BucketEntry bucketEntry = backingMap.get(cacheKey); 1654 if (bucketEntry != null) { 1655 return bucketEntry.getRefCount(); 1656 } 1657 return 0; 1658 } 1659 1660 float getAcceptableFactor() { 1661 return acceptableFactor; 1662 } 1663 1664 float getMinFactor() { 1665 return minFactor; 1666 } 1667 1668 float getExtraFreeFactor() { 1669 return extraFreeFactor; 1670 } 1671 1672 float getSingleFactor() { 1673 return singleFactor; 1674 } 1675 1676 float getMultiFactor() { 1677 return multiFactor; 1678 } 1679 1680 float getMemoryFactor() { 1681 return memoryFactor; 1682 } 1683}