001/** 002 * Copyright The Apache Software Foundation 003 * 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021package org.apache.hadoop.hbase.io.hfile.bucket; 022 023import java.io.File; 024import java.io.FileInputStream; 025import java.io.FileNotFoundException; 026import java.io.FileOutputStream; 027import java.io.IOException; 028import java.io.ObjectInputStream; 029import java.io.ObjectOutputStream; 030import java.io.Serializable; 031import java.nio.ByteBuffer; 032import java.util.ArrayList; 033import java.util.Comparator; 034import java.util.HashSet; 035import java.util.Iterator; 036import java.util.List; 037import java.util.Map; 038import java.util.NavigableSet; 039import java.util.PriorityQueue; 040import java.util.Set; 041import java.util.concurrent.ArrayBlockingQueue; 042import java.util.concurrent.BlockingQueue; 043import java.util.concurrent.ConcurrentHashMap; 044import java.util.concurrent.ConcurrentMap; 045import java.util.concurrent.ConcurrentSkipListSet; 046import java.util.concurrent.Executors; 047import java.util.concurrent.ScheduledExecutorService; 048import java.util.concurrent.TimeUnit; 049import java.util.concurrent.atomic.AtomicInteger; 050import java.util.concurrent.atomic.AtomicLong; 051import java.util.concurrent.atomic.LongAdder; 052import java.util.concurrent.locks.Lock; 053import java.util.concurrent.locks.ReentrantLock; 054import java.util.concurrent.locks.ReentrantReadWriteLock; 055import org.apache.hadoop.conf.Configuration; 056import org.apache.hadoop.hbase.HBaseConfiguration; 057import org.apache.hadoop.hbase.io.HeapSize; 058import org.apache.hadoop.hbase.io.hfile.BlockCache; 059import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 060import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil; 061import org.apache.hadoop.hbase.io.hfile.BlockPriority; 062import org.apache.hadoop.hbase.io.hfile.BlockType; 063import org.apache.hadoop.hbase.io.hfile.CacheStats; 064import org.apache.hadoop.hbase.io.hfile.Cacheable; 065import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType; 066import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; 067import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; 068import org.apache.hadoop.hbase.io.hfile.CachedBlock; 069import org.apache.hadoop.hbase.io.hfile.HFileBlock; 070import org.apache.hadoop.hbase.nio.ByteBuff; 071import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 072import org.apache.hadoop.hbase.util.HasThread; 073import org.apache.hadoop.hbase.util.IdReadWriteLock; 074import org.apache.hadoop.hbase.util.IdReadWriteLock.ReferenceType; 075import org.apache.hadoop.hbase.util.UnsafeAvailChecker; 076import org.apache.hadoop.util.StringUtils; 077import org.apache.yetus.audience.InterfaceAudience; 078import org.slf4j.Logger; 079import org.slf4j.LoggerFactory; 080 081import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 082import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 083import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 084 085/** 086 * BucketCache uses {@link BucketAllocator} to allocate/free blocks, and uses 087 * BucketCache#ramCache and BucketCache#backingMap in order to 088 * determine if a given element is in the cache. The bucket cache can use on-heap or 089 * off-heap memory {@link ByteBufferIOEngine} or in a file {@link FileIOEngine} to 090 * store/read the block data. 091 * 092 * <p>Eviction is via a similar algorithm as used in 093 * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache} 094 * 095 * <p>BucketCache can be used as mainly a block cache (see 096 * {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with 097 * LruBlockCache to decrease CMS GC and heap fragmentation. 098 * 099 * <p>It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store 100 * blocks) to enlarge cache space via 101 * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache#setVictimCache} 102 */ 103@InterfaceAudience.Private 104public class BucketCache implements BlockCache, HeapSize { 105 private static final Logger LOG = LoggerFactory.getLogger(BucketCache.class); 106 107 /** Priority buckets config */ 108 static final String SINGLE_FACTOR_CONFIG_NAME = "hbase.bucketcache.single.factor"; 109 static final String MULTI_FACTOR_CONFIG_NAME = "hbase.bucketcache.multi.factor"; 110 static final String MEMORY_FACTOR_CONFIG_NAME = "hbase.bucketcache.memory.factor"; 111 static final String EXTRA_FREE_FACTOR_CONFIG_NAME = "hbase.bucketcache.extrafreefactor"; 112 static final String ACCEPT_FACTOR_CONFIG_NAME = "hbase.bucketcache.acceptfactor"; 113 static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor"; 114 115 /** Priority buckets */ 116 @VisibleForTesting 117 static final float DEFAULT_SINGLE_FACTOR = 0.25f; 118 static final float DEFAULT_MULTI_FACTOR = 0.50f; 119 static final float DEFAULT_MEMORY_FACTOR = 0.25f; 120 static final float DEFAULT_MIN_FACTOR = 0.85f; 121 122 private static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f; 123 private static final float DEFAULT_ACCEPT_FACTOR = 0.95f; 124 125 // Number of blocks to clear for each of the bucket size that is full 126 private static final int DEFAULT_FREE_ENTIRE_BLOCK_FACTOR = 2; 127 128 /** Statistics thread */ 129 private static final int statThreadPeriod = 5 * 60; 130 131 final static int DEFAULT_WRITER_THREADS = 3; 132 final static int DEFAULT_WRITER_QUEUE_ITEMS = 64; 133 134 // Store/read block data 135 transient final IOEngine ioEngine; 136 137 // Store the block in this map before writing it to cache 138 @VisibleForTesting 139 transient final ConcurrentMap<BlockCacheKey, RAMQueueEntry> ramCache; 140 // In this map, store the block's meta data like offset, length 141 @VisibleForTesting 142 transient ConcurrentMap<BlockCacheKey, BucketEntry> backingMap; 143 144 /** 145 * Flag if the cache is enabled or not... We shut it off if there are IO 146 * errors for some time, so that Bucket IO exceptions/errors don't bring down 147 * the HBase server. 148 */ 149 private volatile boolean cacheEnabled; 150 151 /** 152 * A list of writer queues. We have a queue per {@link WriterThread} we have running. 153 * In other words, the work adding blocks to the BucketCache is divided up amongst the 154 * running WriterThreads. Its done by taking hash of the cache key modulo queue count. 155 * WriterThread when it runs takes whatever has been recently added and 'drains' the entries 156 * to the BucketCache. It then updates the ramCache and backingMap accordingly. 157 */ 158 @VisibleForTesting 159 transient final ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues = new ArrayList<>(); 160 @VisibleForTesting 161 transient final WriterThread[] writerThreads; 162 163 /** Volatile boolean to track if free space is in process or not */ 164 private volatile boolean freeInProgress = false; 165 private transient final Lock freeSpaceLock = new ReentrantLock(); 166 167 private UniqueIndexMap<Integer> deserialiserMap = new UniqueIndexMap<>(); 168 169 private final LongAdder realCacheSize = new LongAdder(); 170 private final LongAdder heapSize = new LongAdder(); 171 /** Current number of cached elements */ 172 private final LongAdder blockNumber = new LongAdder(); 173 174 /** Cache access count (sequential ID) */ 175 private final AtomicLong accessCount = new AtomicLong(); 176 177 private static final int DEFAULT_CACHE_WAIT_TIME = 50; 178 // Used in test now. If the flag is false and the cache speed is very fast, 179 // bucket cache will skip some blocks when caching. If the flag is true, we 180 // will wait blocks flushed to IOEngine for some time when caching 181 boolean wait_when_cache = false; 182 183 private final BucketCacheStats cacheStats = new BucketCacheStats(); 184 185 private final String persistencePath; 186 private final long cacheCapacity; 187 /** Approximate block size */ 188 private final long blockSize; 189 190 /** Duration of IO errors tolerated before we disable cache, 1 min as default */ 191 private final int ioErrorsTolerationDuration; 192 // 1 min 193 public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000; 194 195 // Start time of first IO error when reading or writing IO Engine, it will be 196 // reset after a successful read/write. 197 private volatile long ioErrorStartTime = -1; 198 199 /** 200 * A ReentrantReadWriteLock to lock on a particular block identified by offset. 201 * The purpose of this is to avoid freeing the block which is being read. 202 * <p> 203 * Key set of offsets in BucketCache is limited so soft reference is the best choice here. 204 */ 205 @VisibleForTesting 206 transient final IdReadWriteLock<Long> offsetLock = new IdReadWriteLock<>(ReferenceType.SOFT); 207 208 private final NavigableSet<BlockCacheKey> blocksByHFile = 209 new ConcurrentSkipListSet<>(new Comparator<BlockCacheKey>() { 210 @Override 211 public int compare(BlockCacheKey a, BlockCacheKey b) { 212 int nameComparison = a.getHfileName().compareTo(b.getHfileName()); 213 if (nameComparison != 0) { 214 return nameComparison; 215 } 216 217 if (a.getOffset() == b.getOffset()) { 218 return 0; 219 } else if (a.getOffset() < b.getOffset()) { 220 return -1; 221 } 222 return 1; 223 } 224 }); 225 226 /** Statistics thread schedule pool (for heavy debugging, could remove) */ 227 private transient final ScheduledExecutorService scheduleThreadPool = 228 Executors.newScheduledThreadPool(1, 229 new ThreadFactoryBuilder().setNameFormat("BucketCacheStatsExecutor").setDaemon(true).build()); 230 231 // Allocate or free space for the block 232 private transient BucketAllocator bucketAllocator; 233 234 /** Acceptable size of cache (no evictions if size < acceptable) */ 235 private float acceptableFactor; 236 237 /** Minimum threshold of cache (when evicting, evict until size < min) */ 238 private float minFactor; 239 240 /** Free this floating point factor of extra blocks when evicting. For example free the number of blocks requested * (1 + extraFreeFactor) */ 241 private float extraFreeFactor; 242 243 /** Single access bucket size */ 244 private float singleFactor; 245 246 /** Multiple access bucket size */ 247 private float multiFactor; 248 249 /** In-memory bucket size */ 250 private float memoryFactor; 251 252 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 253 int writerThreadNum, int writerQLen, String persistencePath) throws FileNotFoundException, 254 IOException { 255 this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, 256 persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, HBaseConfiguration.create()); 257 } 258 259 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 260 int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration, 261 Configuration conf) 262 throws FileNotFoundException, IOException { 263 this.ioEngine = getIOEngineFromName(ioEngineName, capacity, persistencePath); 264 this.writerThreads = new WriterThread[writerThreadNum]; 265 long blockNumCapacity = capacity / blockSize; 266 if (blockNumCapacity >= Integer.MAX_VALUE) { 267 // Enough for about 32TB of cache! 268 throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now"); 269 } 270 271 this.acceptableFactor = conf.getFloat(ACCEPT_FACTOR_CONFIG_NAME, DEFAULT_ACCEPT_FACTOR); 272 this.minFactor = conf.getFloat(MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR); 273 this.extraFreeFactor = conf.getFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, DEFAULT_EXTRA_FREE_FACTOR); 274 this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR); 275 this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR); 276 this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR); 277 278 sanityCheckConfigs(); 279 280 LOG.info("Instantiating BucketCache with acceptableFactor: " + acceptableFactor + ", minFactor: " + minFactor + 281 ", extraFreeFactor: " + extraFreeFactor + ", singleFactor: " + singleFactor + ", multiFactor: " + multiFactor + 282 ", memoryFactor: " + memoryFactor); 283 284 this.cacheCapacity = capacity; 285 this.persistencePath = persistencePath; 286 this.blockSize = blockSize; 287 this.ioErrorsTolerationDuration = ioErrorsTolerationDuration; 288 289 bucketAllocator = new BucketAllocator(capacity, bucketSizes); 290 for (int i = 0; i < writerThreads.length; ++i) { 291 writerQueues.add(new ArrayBlockingQueue<>(writerQLen)); 292 } 293 294 assert writerQueues.size() == writerThreads.length; 295 this.ramCache = new ConcurrentHashMap<>(); 296 297 this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity); 298 299 if (ioEngine.isPersistent() && persistencePath != null) { 300 try { 301 retrieveFromFile(bucketSizes); 302 } catch (IOException ioex) { 303 LOG.error("Can't restore from file because of", ioex); 304 } catch (ClassNotFoundException cnfe) { 305 LOG.error("Can't restore from file in rebuild because can't deserialise",cnfe); 306 throw new RuntimeException(cnfe); 307 } 308 } 309 final String threadName = Thread.currentThread().getName(); 310 this.cacheEnabled = true; 311 for (int i = 0; i < writerThreads.length; ++i) { 312 writerThreads[i] = new WriterThread(writerQueues.get(i)); 313 writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i); 314 writerThreads[i].setDaemon(true); 315 } 316 startWriterThreads(); 317 318 // Run the statistics thread periodically to print the cache statistics log 319 // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log 320 // every five minutes. 321 this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), 322 statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS); 323 LOG.info("Started bucket cache; ioengine=" + ioEngineName + 324 ", capacity=" + StringUtils.byteDesc(capacity) + 325 ", blockSize=" + StringUtils.byteDesc(blockSize) + ", writerThreadNum=" + 326 writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath=" + 327 persistencePath + ", bucketAllocator=" + this.bucketAllocator.getClass().getName()); 328 } 329 330 private void sanityCheckConfigs() { 331 Preconditions.checkArgument(acceptableFactor <= 1 && acceptableFactor >= 0, ACCEPT_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 332 Preconditions.checkArgument(minFactor <= 1 && minFactor >= 0, MIN_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 333 Preconditions.checkArgument(minFactor <= acceptableFactor, MIN_FACTOR_CONFIG_NAME + " must be <= " + ACCEPT_FACTOR_CONFIG_NAME); 334 Preconditions.checkArgument(extraFreeFactor >= 0, EXTRA_FREE_FACTOR_CONFIG_NAME + " must be greater than 0.0"); 335 Preconditions.checkArgument(singleFactor <= 1 && singleFactor >= 0, SINGLE_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 336 Preconditions.checkArgument(multiFactor <= 1 && multiFactor >= 0, MULTI_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 337 Preconditions.checkArgument(memoryFactor <= 1 && memoryFactor >= 0, MEMORY_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 338 Preconditions.checkArgument((singleFactor + multiFactor + memoryFactor) == 1, SINGLE_FACTOR_CONFIG_NAME + ", " + 339 MULTI_FACTOR_CONFIG_NAME + ", and " + MEMORY_FACTOR_CONFIG_NAME + " segments must add up to 1.0"); 340 } 341 342 /** 343 * Called by the constructor to start the writer threads. Used by tests that need to override 344 * starting the threads. 345 */ 346 @VisibleForTesting 347 protected void startWriterThreads() { 348 for (WriterThread thread : writerThreads) { 349 thread.start(); 350 } 351 } 352 353 @VisibleForTesting 354 boolean isCacheEnabled() { 355 return this.cacheEnabled; 356 } 357 358 @Override 359 public long getMaxSize() { 360 return this.cacheCapacity; 361 } 362 363 public String getIoEngine() { 364 return ioEngine.toString(); 365 } 366 367 /** 368 * Get the IOEngine from the IO engine name 369 * @param ioEngineName 370 * @param capacity 371 * @param persistencePath 372 * @return the IOEngine 373 * @throws IOException 374 */ 375 private IOEngine getIOEngineFromName(String ioEngineName, long capacity, String persistencePath) 376 throws IOException { 377 if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) { 378 // In order to make the usage simple, we only need the prefix 'files:' in 379 // document whether one or multiple file(s), but also support 'file:' for 380 // the compatibility 381 String[] filePaths = ioEngineName.substring(ioEngineName.indexOf(":") + 1) 382 .split(FileIOEngine.FILE_DELIMITER); 383 return new FileIOEngine(capacity, persistencePath != null, filePaths); 384 } else if (ioEngineName.startsWith("offheap")) { 385 return new ByteBufferIOEngine(capacity); 386 } else if (ioEngineName.startsWith("mmap:")) { 387 return new FileMmapEngine(ioEngineName.substring(5), capacity); 388 } else { 389 throw new IllegalArgumentException( 390 "Don't understand io engine name for cache- prefix with file:, files:, mmap: or offheap"); 391 } 392 } 393 394 /** 395 * Cache the block with the specified name and buffer. 396 * @param cacheKey block's cache key 397 * @param buf block buffer 398 */ 399 @Override 400 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { 401 cacheBlock(cacheKey, buf, false); 402 } 403 404 /** 405 * Cache the block with the specified name and buffer. 406 * @param cacheKey block's cache key 407 * @param cachedItem block buffer 408 * @param inMemory if block is in-memory 409 */ 410 @Override 411 public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory) { 412 cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache); 413 } 414 415 /** 416 * Cache the block to ramCache 417 * @param cacheKey block's cache key 418 * @param cachedItem block buffer 419 * @param inMemory if block is in-memory 420 * @param wait if true, blocking wait when queue is full 421 */ 422 private void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, 423 boolean wait) { 424 if (cacheEnabled) { 425 if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) { 426 if (BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, cachedItem)) { 427 cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); 428 } 429 } else { 430 cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); 431 } 432 } 433 } 434 435 private void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem, 436 boolean inMemory, boolean wait) { 437 if (!cacheEnabled) { 438 return; 439 } 440 LOG.trace("Caching key={}, item={}", cacheKey, cachedItem); 441 // Stuff the entry into the RAM cache so it can get drained to the persistent store 442 RAMQueueEntry re = 443 new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory); 444 /** 445 * Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same 446 * key in ramCache, the heap size of bucket cache need to update if replacing entry from 447 * ramCache. But WriterThread will also remove entry from ramCache and update heap size, if 448 * using ramCache.put(), It's possible that the removed entry in WriterThread is not the correct 449 * one, then the heap size will mess up (HBASE-20789) 450 */ 451 if (ramCache.putIfAbsent(cacheKey, re) != null) { 452 return; 453 } 454 int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size(); 455 BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum); 456 boolean successfulAddition = false; 457 if (wait) { 458 try { 459 successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS); 460 } catch (InterruptedException e) { 461 Thread.currentThread().interrupt(); 462 } 463 } else { 464 successfulAddition = bq.offer(re); 465 } 466 if (!successfulAddition) { 467 ramCache.remove(cacheKey); 468 cacheStats.failInsert(); 469 } else { 470 this.blockNumber.increment(); 471 this.heapSize.add(cachedItem.heapSize()); 472 blocksByHFile.add(cacheKey); 473 } 474 } 475 476 /** 477 * Get the buffer of the block with the specified key. 478 * @param key block's cache key 479 * @param caching true if the caller caches blocks on cache misses 480 * @param repeat Whether this is a repeat lookup for the same block 481 * @param updateCacheMetrics Whether we should update cache metrics or not 482 * @return buffer of specified cache key, or null if not in cache 483 */ 484 @Override 485 public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, 486 boolean updateCacheMetrics) { 487 if (!cacheEnabled) { 488 return null; 489 } 490 RAMQueueEntry re = ramCache.get(key); 491 if (re != null) { 492 if (updateCacheMetrics) { 493 cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); 494 } 495 re.access(accessCount.incrementAndGet()); 496 return re.getData(); 497 } 498 BucketEntry bucketEntry = backingMap.get(key); 499 if (bucketEntry != null) { 500 long start = System.nanoTime(); 501 ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); 502 try { 503 lock.readLock().lock(); 504 // We can not read here even if backingMap does contain the given key because its offset 505 // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check 506 // existence here. 507 if (bucketEntry.equals(backingMap.get(key))) { 508 // TODO : change this area - should be removed after server cells and 509 // 12295 are available 510 int len = bucketEntry.getLength(); 511 if (LOG.isTraceEnabled()) { 512 LOG.trace("Read offset=" + bucketEntry.offset() + ", len=" + len); 513 } 514 Cacheable cachedBlock = ioEngine.read(bucketEntry.offset(), len, 515 bucketEntry.deserializerReference(this.deserialiserMap)); 516 long timeTaken = System.nanoTime() - start; 517 if (updateCacheMetrics) { 518 cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); 519 cacheStats.ioHit(timeTaken); 520 } 521 if (cachedBlock.getMemoryType() == MemoryType.SHARED) { 522 bucketEntry.incrementRefCountAndGet(); 523 } 524 bucketEntry.access(accessCount.incrementAndGet()); 525 if (this.ioErrorStartTime > 0) { 526 ioErrorStartTime = -1; 527 } 528 return cachedBlock; 529 } 530 } catch (IOException ioex) { 531 LOG.error("Failed reading block " + key + " from bucket cache", ioex); 532 checkIOErrorIsTolerated(); 533 } finally { 534 lock.readLock().unlock(); 535 } 536 } 537 if (!repeat && updateCacheMetrics) { 538 cacheStats.miss(caching, key.isPrimary(), key.getBlockType()); 539 } 540 return null; 541 } 542 543 @VisibleForTesting 544 void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber) { 545 bucketAllocator.freeBlock(bucketEntry.offset()); 546 realCacheSize.add(-1 * bucketEntry.getLength()); 547 blocksByHFile.remove(cacheKey); 548 if (decrementBlockNumber) { 549 this.blockNumber.decrement(); 550 } 551 } 552 553 @Override 554 public boolean evictBlock(BlockCacheKey cacheKey) { 555 return evictBlock(cacheKey, true); 556 } 557 558 private RAMQueueEntry checkRamCache(BlockCacheKey cacheKey) { 559 RAMQueueEntry removedBlock = ramCache.remove(cacheKey); 560 if (removedBlock != null) { 561 this.blockNumber.decrement(); 562 this.heapSize.add(-1 * removedBlock.getData().heapSize()); 563 } 564 return removedBlock; 565 } 566 567 public boolean evictBlock(BlockCacheKey cacheKey, boolean deletedBlock) { 568 if (!cacheEnabled) { 569 return false; 570 } 571 RAMQueueEntry removedBlock = checkRamCache(cacheKey); 572 BucketEntry bucketEntry = backingMap.get(cacheKey); 573 if (bucketEntry == null) { 574 if (removedBlock != null) { 575 cacheStats.evicted(0, cacheKey.isPrimary()); 576 return true; 577 } else { 578 return false; 579 } 580 } 581 ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); 582 try { 583 lock.writeLock().lock(); 584 int refCount = bucketEntry.getRefCount(); 585 if (refCount == 0) { 586 if (backingMap.remove(cacheKey, bucketEntry)) { 587 blockEvicted(cacheKey, bucketEntry, removedBlock == null); 588 } else { 589 return false; 590 } 591 } else { 592 if(!deletedBlock) { 593 if (LOG.isDebugEnabled()) { 594 LOG.debug("This block " + cacheKey + " is still referred by " + refCount 595 + " readers. Can not be freed now"); 596 } 597 return false; 598 } else { 599 if (LOG.isDebugEnabled()) { 600 LOG.debug("This block " + cacheKey + " is still referred by " + refCount 601 + " readers. Can not be freed now. Hence will mark this" 602 + " for evicting at a later point"); 603 } 604 bucketEntry.markForEvict(); 605 } 606 } 607 } finally { 608 lock.writeLock().unlock(); 609 } 610 cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary()); 611 return true; 612 } 613 614 /* 615 * Statistics thread. Periodically output cache statistics to the log. 616 */ 617 private static class StatisticsThread extends Thread { 618 private final BucketCache bucketCache; 619 620 public StatisticsThread(BucketCache bucketCache) { 621 super("BucketCacheStatsThread"); 622 setDaemon(true); 623 this.bucketCache = bucketCache; 624 } 625 626 @Override 627 public void run() { 628 bucketCache.logStats(); 629 } 630 } 631 632 public void logStats() { 633 long totalSize = bucketAllocator.getTotalSize(); 634 long usedSize = bucketAllocator.getUsedSize(); 635 long freeSize = totalSize - usedSize; 636 long cacheSize = getRealCacheSize(); 637 LOG.info("failedBlockAdditions=" + cacheStats.getFailedInserts() + ", " + 638 "totalSize=" + StringUtils.byteDesc(totalSize) + ", " + 639 "freeSize=" + StringUtils.byteDesc(freeSize) + ", " + 640 "usedSize=" + StringUtils.byteDesc(usedSize) +", " + 641 "cacheSize=" + StringUtils.byteDesc(cacheSize) +", " + 642 "accesses=" + cacheStats.getRequestCount() + ", " + 643 "hits=" + cacheStats.getHitCount() + ", " + 644 "IOhitsPerSecond=" + cacheStats.getIOHitsPerSecond() + ", " + 645 "IOTimePerHit=" + String.format("%.2f", cacheStats.getIOTimePerHit())+ ", " + 646 "hitRatio=" + (cacheStats.getHitCount() == 0 ? "0," : 647 (StringUtils.formatPercent(cacheStats.getHitRatio(), 2)+ ", ")) + 648 "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " + 649 "cachingHits=" + cacheStats.getHitCachingCount() + ", " + 650 "cachingHitsRatio=" +(cacheStats.getHitCachingCount() == 0 ? "0," : 651 (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2)+ ", ")) + 652 "evictions=" + cacheStats.getEvictionCount() + ", " + 653 "evicted=" + cacheStats.getEvictedCount() + ", " + 654 "evictedPerRun=" + cacheStats.evictedPerEviction()); 655 cacheStats.reset(); 656 } 657 658 public long getRealCacheSize() { 659 return this.realCacheSize.sum(); 660 } 661 662 private long acceptableSize() { 663 return (long) Math.floor(bucketAllocator.getTotalSize() * acceptableFactor); 664 } 665 666 @VisibleForTesting 667 long getPartitionSize(float partitionFactor) { 668 return (long) Math.floor(bucketAllocator.getTotalSize() * partitionFactor * minFactor); 669 } 670 671 /** 672 * Return the count of bucketSizeinfos still need free space 673 */ 674 private int bucketSizesAboveThresholdCount(float minFactor) { 675 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); 676 int fullCount = 0; 677 for (int i = 0; i < stats.length; i++) { 678 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor)); 679 freeGoal = Math.max(freeGoal, 1); 680 if (stats[i].freeCount() < freeGoal) { 681 fullCount++; 682 } 683 } 684 return fullCount; 685 } 686 687 /** 688 * This method will find the buckets that are minimally occupied 689 * and are not reference counted and will free them completely 690 * without any constraint on the access times of the elements, 691 * and as a process will completely free at most the number of buckets 692 * passed, sometimes it might not due to changing refCounts 693 * 694 * @param completelyFreeBucketsNeeded number of buckets to free 695 **/ 696 private void freeEntireBuckets(int completelyFreeBucketsNeeded) { 697 if (completelyFreeBucketsNeeded != 0) { 698 // First we will build a set where the offsets are reference counted, usually 699 // this set is small around O(Handler Count) unless something else is wrong 700 Set<Integer> inUseBuckets = new HashSet<Integer>(); 701 for (BucketEntry entry : backingMap.values()) { 702 if (entry.getRefCount() != 0) { 703 inUseBuckets.add(bucketAllocator.getBucketIndex(entry.offset())); 704 } 705 } 706 707 Set<Integer> candidateBuckets = bucketAllocator.getLeastFilledBuckets( 708 inUseBuckets, completelyFreeBucketsNeeded); 709 for (Map.Entry<BlockCacheKey, BucketEntry> entry : backingMap.entrySet()) { 710 if (candidateBuckets.contains(bucketAllocator 711 .getBucketIndex(entry.getValue().offset()))) { 712 evictBlock(entry.getKey(), false); 713 } 714 } 715 } 716 } 717 718 /** 719 * Free the space if the used size reaches acceptableSize() or one size block 720 * couldn't be allocated. When freeing the space, we use the LRU algorithm and 721 * ensure there must be some blocks evicted 722 * @param why Why we are being called 723 */ 724 private void freeSpace(final String why) { 725 // Ensure only one freeSpace progress at a time 726 if (!freeSpaceLock.tryLock()) { 727 return; 728 } 729 try { 730 freeInProgress = true; 731 long bytesToFreeWithoutExtra = 0; 732 // Calculate free byte for each bucketSizeinfo 733 StringBuilder msgBuffer = LOG.isDebugEnabled()? new StringBuilder(): null; 734 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); 735 long[] bytesToFreeForBucket = new long[stats.length]; 736 for (int i = 0; i < stats.length; i++) { 737 bytesToFreeForBucket[i] = 0; 738 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor)); 739 freeGoal = Math.max(freeGoal, 1); 740 if (stats[i].freeCount() < freeGoal) { 741 bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount()); 742 bytesToFreeWithoutExtra += bytesToFreeForBucket[i]; 743 if (msgBuffer != null) { 744 msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")=" 745 + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", "); 746 } 747 } 748 } 749 if (msgBuffer != null) { 750 msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", "); 751 } 752 753 if (bytesToFreeWithoutExtra <= 0) { 754 return; 755 } 756 long currentSize = bucketAllocator.getUsedSize(); 757 long totalSize = bucketAllocator.getTotalSize(); 758 if (LOG.isDebugEnabled() && msgBuffer != null) { 759 LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString() + 760 " of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" + 761 StringUtils.byteDesc(realCacheSize.sum()) + ", total=" + StringUtils.byteDesc(totalSize)); 762 } 763 764 long bytesToFreeWithExtra = (long) Math.floor(bytesToFreeWithoutExtra 765 * (1 + extraFreeFactor)); 766 767 // Instantiate priority buckets 768 BucketEntryGroup bucketSingle = new BucketEntryGroup(bytesToFreeWithExtra, 769 blockSize, getPartitionSize(singleFactor)); 770 BucketEntryGroup bucketMulti = new BucketEntryGroup(bytesToFreeWithExtra, 771 blockSize, getPartitionSize(multiFactor)); 772 BucketEntryGroup bucketMemory = new BucketEntryGroup(bytesToFreeWithExtra, 773 blockSize, getPartitionSize(memoryFactor)); 774 775 // Scan entire map putting bucket entry into appropriate bucket entry 776 // group 777 for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) { 778 switch (bucketEntryWithKey.getValue().getPriority()) { 779 case SINGLE: { 780 bucketSingle.add(bucketEntryWithKey); 781 break; 782 } 783 case MULTI: { 784 bucketMulti.add(bucketEntryWithKey); 785 break; 786 } 787 case MEMORY: { 788 bucketMemory.add(bucketEntryWithKey); 789 break; 790 } 791 } 792 } 793 794 PriorityQueue<BucketEntryGroup> bucketQueue = new PriorityQueue<>(3, 795 Comparator.comparingLong(BucketEntryGroup::overflow)); 796 797 bucketQueue.add(bucketSingle); 798 bucketQueue.add(bucketMulti); 799 bucketQueue.add(bucketMemory); 800 801 int remainingBuckets = bucketQueue.size(); 802 long bytesFreed = 0; 803 804 BucketEntryGroup bucketGroup; 805 while ((bucketGroup = bucketQueue.poll()) != null) { 806 long overflow = bucketGroup.overflow(); 807 if (overflow > 0) { 808 long bucketBytesToFree = Math.min(overflow, 809 (bytesToFreeWithoutExtra - bytesFreed) / remainingBuckets); 810 bytesFreed += bucketGroup.free(bucketBytesToFree); 811 } 812 remainingBuckets--; 813 } 814 815 // Check and free if there are buckets that still need freeing of space 816 if (bucketSizesAboveThresholdCount(minFactor) > 0) { 817 bucketQueue.clear(); 818 remainingBuckets = 3; 819 820 bucketQueue.add(bucketSingle); 821 bucketQueue.add(bucketMulti); 822 bucketQueue.add(bucketMemory); 823 824 while ((bucketGroup = bucketQueue.poll()) != null) { 825 long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets; 826 bytesFreed += bucketGroup.free(bucketBytesToFree); 827 remainingBuckets--; 828 } 829 } 830 831 // Even after the above free we might still need freeing because of the 832 // De-fragmentation of the buckets (also called Slab Calcification problem), i.e 833 // there might be some buckets where the occupancy is very sparse and thus are not 834 // yielding the free for the other bucket sizes, the fix for this to evict some 835 // of the buckets, we do this by evicting the buckets that are least fulled 836 freeEntireBuckets(DEFAULT_FREE_ENTIRE_BLOCK_FACTOR * 837 bucketSizesAboveThresholdCount(1.0f)); 838 839 if (LOG.isDebugEnabled()) { 840 long single = bucketSingle.totalSize(); 841 long multi = bucketMulti.totalSize(); 842 long memory = bucketMemory.totalSize(); 843 if (LOG.isDebugEnabled()) { 844 LOG.debug("Bucket cache free space completed; " + "freed=" 845 + StringUtils.byteDesc(bytesFreed) + ", " + "total=" 846 + StringUtils.byteDesc(totalSize) + ", " + "single=" 847 + StringUtils.byteDesc(single) + ", " + "multi=" 848 + StringUtils.byteDesc(multi) + ", " + "memory=" 849 + StringUtils.byteDesc(memory)); 850 } 851 } 852 853 } catch (Throwable t) { 854 LOG.warn("Failed freeing space", t); 855 } finally { 856 cacheStats.evict(); 857 freeInProgress = false; 858 freeSpaceLock.unlock(); 859 } 860 } 861 862 // This handles flushing the RAM cache to IOEngine. 863 @VisibleForTesting 864 class WriterThread extends HasThread { 865 private final BlockingQueue<RAMQueueEntry> inputQueue; 866 private volatile boolean writerEnabled = true; 867 868 WriterThread(BlockingQueue<RAMQueueEntry> queue) { 869 super("BucketCacheWriterThread"); 870 this.inputQueue = queue; 871 } 872 873 // Used for test 874 @VisibleForTesting 875 void disableWriter() { 876 this.writerEnabled = false; 877 } 878 879 @Override 880 public void run() { 881 List<RAMQueueEntry> entries = new ArrayList<>(); 882 try { 883 while (cacheEnabled && writerEnabled) { 884 try { 885 try { 886 // Blocks 887 entries = getRAMQueueEntries(inputQueue, entries); 888 } catch (InterruptedException ie) { 889 if (!cacheEnabled) break; 890 } 891 doDrain(entries); 892 } catch (Exception ioe) { 893 LOG.error("WriterThread encountered error", ioe); 894 } 895 } 896 } catch (Throwable t) { 897 LOG.warn("Failed doing drain", t); 898 } 899 LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled); 900 } 901 902 /** 903 * Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing 904 * cache with a new block for the same cache key. there's a corner case: one thread cache a 905 * block in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another 906 * new block with the same cache key do the same thing for the same cache key, so if not evict 907 * the previous bucket entry, then memory leak happen because the previous bucketEntry is gone 908 * but the bucketAllocator do not free its memory. 909 * @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey 910 * cacheKey, Cacheable newBlock) 911 * @param key Block cache key 912 * @param bucketEntry Bucket entry to put into backingMap. 913 */ 914 private void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) { 915 BucketEntry previousEntry = backingMap.put(key, bucketEntry); 916 if (previousEntry != null && previousEntry != bucketEntry) { 917 ReentrantReadWriteLock lock = offsetLock.getLock(previousEntry.offset()); 918 lock.writeLock().lock(); 919 try { 920 blockEvicted(key, previousEntry, false); 921 } finally { 922 lock.writeLock().unlock(); 923 } 924 } 925 } 926 927 /** 928 * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap. 929 * Process all that are passed in even if failure being sure to remove from ramCache else we'll 930 * never undo the references and we'll OOME. 931 * @param entries Presumes list passed in here will be processed by this invocation only. No 932 * interference expected. 933 * @throws InterruptedException 934 */ 935 @VisibleForTesting 936 void doDrain(final List<RAMQueueEntry> entries) throws InterruptedException { 937 if (entries.isEmpty()) { 938 return; 939 } 940 // This method is a little hard to follow. We run through the passed in entries and for each 941 // successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must 942 // do cleanup making sure we've cleared ramCache of all entries regardless of whether we 943 // successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by 944 // filling ramCache. We do the clean up by again running through the passed in entries 945 // doing extra work when we find a non-null bucketEntries corresponding entry. 946 final int size = entries.size(); 947 BucketEntry[] bucketEntries = new BucketEntry[size]; 948 // Index updated inside loop if success or if we can't succeed. We retry if cache is full 949 // when we go to add an entry by going around the loop again without upping the index. 950 int index = 0; 951 while (cacheEnabled && index < size) { 952 RAMQueueEntry re = null; 953 try { 954 re = entries.get(index); 955 if (re == null) { 956 LOG.warn("Couldn't get entry or changed on us; who else is messing with it?"); 957 index++; 958 continue; 959 } 960 BucketEntry bucketEntry = 961 re.writeToCache(ioEngine, bucketAllocator, deserialiserMap, realCacheSize); 962 // Successfully added. Up index and add bucketEntry. Clear io exceptions. 963 bucketEntries[index] = bucketEntry; 964 if (ioErrorStartTime > 0) { 965 ioErrorStartTime = -1; 966 } 967 index++; 968 } catch (BucketAllocatorException fle) { 969 LOG.warn("Failed allocation for " + (re == null ? "" : re.getKey()) + "; " + fle); 970 // Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below. 971 bucketEntries[index] = null; 972 index++; 973 } catch (CacheFullException cfe) { 974 // Cache full when we tried to add. Try freeing space and then retrying (don't up index) 975 if (!freeInProgress) { 976 freeSpace("Full!"); 977 } else { 978 Thread.sleep(50); 979 } 980 } catch (IOException ioex) { 981 // Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem. 982 LOG.error("Failed writing to bucket cache", ioex); 983 checkIOErrorIsTolerated(); 984 } 985 } 986 987 // Make sure data pages are written on media before we update maps. 988 try { 989 ioEngine.sync(); 990 } catch (IOException ioex) { 991 LOG.error("Failed syncing IO engine", ioex); 992 checkIOErrorIsTolerated(); 993 // Since we failed sync, free the blocks in bucket allocator 994 for (int i = 0; i < entries.size(); ++i) { 995 if (bucketEntries[i] != null) { 996 bucketAllocator.freeBlock(bucketEntries[i].offset()); 997 bucketEntries[i] = null; 998 } 999 } 1000 } 1001 1002 // Now add to backingMap if successfully added to bucket cache. Remove from ramCache if 1003 // success or error. 1004 for (int i = 0; i < size; ++i) { 1005 BlockCacheKey key = entries.get(i).getKey(); 1006 // Only add if non-null entry. 1007 if (bucketEntries[i] != null) { 1008 putIntoBackingMap(key, bucketEntries[i]); 1009 } 1010 // Always remove from ramCache even if we failed adding it to the block cache above. 1011 RAMQueueEntry ramCacheEntry = ramCache.remove(key); 1012 if (ramCacheEntry != null) { 1013 heapSize.add(-1 * entries.get(i).getData().heapSize()); 1014 } else if (bucketEntries[i] != null){ 1015 // Block should have already been evicted. Remove it and free space. 1016 ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntries[i].offset()); 1017 try { 1018 lock.writeLock().lock(); 1019 int refCount = bucketEntries[i].getRefCount(); 1020 if (refCount == 0) { 1021 if (backingMap.remove(key, bucketEntries[i])) { 1022 blockEvicted(key, bucketEntries[i], false); 1023 } else { 1024 bucketEntries[i].markForEvict(); 1025 } 1026 } else { 1027 bucketEntries[i].markForEvict(); 1028 } 1029 } finally { 1030 lock.writeLock().unlock(); 1031 } 1032 } 1033 } 1034 1035 long used = bucketAllocator.getUsedSize(); 1036 if (used > acceptableSize()) { 1037 freeSpace("Used=" + used + " > acceptable=" + acceptableSize()); 1038 } 1039 return; 1040 } 1041 } 1042 1043 /** 1044 * Blocks until elements available in {@code q} then tries to grab as many as possible 1045 * before returning. 1046 * @param receptacle Where to stash the elements taken from queue. We clear before we use it 1047 * just in case. 1048 * @param q The queue to take from. 1049 * @return {@code receptacle} laden with elements taken from the queue or empty if none found. 1050 */ 1051 @VisibleForTesting 1052 static List<RAMQueueEntry> getRAMQueueEntries(final BlockingQueue<RAMQueueEntry> q, 1053 final List<RAMQueueEntry> receptacle) 1054 throws InterruptedException { 1055 // Clear sets all entries to null and sets size to 0. We retain allocations. Presume it 1056 // ok even if list grew to accommodate thousands. 1057 receptacle.clear(); 1058 receptacle.add(q.take()); 1059 q.drainTo(receptacle); 1060 return receptacle; 1061 } 1062 1063 private void persistToFile() throws IOException { 1064 assert !cacheEnabled; 1065 FileOutputStream fos = null; 1066 ObjectOutputStream oos = null; 1067 try { 1068 if (!ioEngine.isPersistent()) { 1069 throw new IOException("Attempt to persist non-persistent cache mappings!"); 1070 } 1071 fos = new FileOutputStream(persistencePath, false); 1072 oos = new ObjectOutputStream(fos); 1073 oos.writeLong(cacheCapacity); 1074 oos.writeUTF(ioEngine.getClass().getName()); 1075 oos.writeUTF(backingMap.getClass().getName()); 1076 oos.writeObject(deserialiserMap); 1077 oos.writeObject(backingMap); 1078 } finally { 1079 if (oos != null) oos.close(); 1080 if (fos != null) fos.close(); 1081 } 1082 } 1083 1084 @SuppressWarnings("unchecked") 1085 private void retrieveFromFile(int[] bucketSizes) throws IOException, BucketAllocatorException, 1086 ClassNotFoundException { 1087 File persistenceFile = new File(persistencePath); 1088 if (!persistenceFile.exists()) { 1089 return; 1090 } 1091 assert !cacheEnabled; 1092 FileInputStream fis = null; 1093 ObjectInputStream ois = null; 1094 try { 1095 if (!ioEngine.isPersistent()) 1096 throw new IOException( 1097 "Attempt to restore non-persistent cache mappings!"); 1098 fis = new FileInputStream(persistencePath); 1099 ois = new ObjectInputStream(fis); 1100 long capacitySize = ois.readLong(); 1101 if (capacitySize != cacheCapacity) 1102 throw new IOException("Mismatched cache capacity:" 1103 + StringUtils.byteDesc(capacitySize) + ", expected: " 1104 + StringUtils.byteDesc(cacheCapacity)); 1105 String ioclass = ois.readUTF(); 1106 String mapclass = ois.readUTF(); 1107 if (!ioEngine.getClass().getName().equals(ioclass)) 1108 throw new IOException("Class name for IO engine mismatch: " + ioclass 1109 + ", expected:" + ioEngine.getClass().getName()); 1110 if (!backingMap.getClass().getName().equals(mapclass)) 1111 throw new IOException("Class name for cache map mismatch: " + mapclass 1112 + ", expected:" + backingMap.getClass().getName()); 1113 UniqueIndexMap<Integer> deserMap = (UniqueIndexMap<Integer>) ois 1114 .readObject(); 1115 ConcurrentHashMap<BlockCacheKey, BucketEntry> backingMapFromFile = 1116 (ConcurrentHashMap<BlockCacheKey, BucketEntry>) ois.readObject(); 1117 BucketAllocator allocator = new BucketAllocator(cacheCapacity, bucketSizes, 1118 backingMapFromFile, realCacheSize); 1119 bucketAllocator = allocator; 1120 deserialiserMap = deserMap; 1121 backingMap = backingMapFromFile; 1122 } finally { 1123 if (ois != null) ois.close(); 1124 if (fis != null) fis.close(); 1125 if (!persistenceFile.delete()) { 1126 throw new IOException("Failed deleting persistence file " 1127 + persistenceFile.getAbsolutePath()); 1128 } 1129 } 1130 } 1131 1132 /** 1133 * Check whether we tolerate IO error this time. If the duration of IOEngine 1134 * throwing errors exceeds ioErrorsDurationTimeTolerated, we will disable the 1135 * cache 1136 */ 1137 private void checkIOErrorIsTolerated() { 1138 long now = EnvironmentEdgeManager.currentTime(); 1139 if (this.ioErrorStartTime > 0) { 1140 if (cacheEnabled && (now - ioErrorStartTime) > this.ioErrorsTolerationDuration) { 1141 LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration + 1142 "ms, disabling cache, please check your IOEngine"); 1143 disableCache(); 1144 } 1145 } else { 1146 this.ioErrorStartTime = now; 1147 } 1148 } 1149 1150 /** 1151 * Used to shut down the cache -or- turn it off in the case of something broken. 1152 */ 1153 private void disableCache() { 1154 if (!cacheEnabled) return; 1155 cacheEnabled = false; 1156 ioEngine.shutdown(); 1157 this.scheduleThreadPool.shutdown(); 1158 for (int i = 0; i < writerThreads.length; ++i) writerThreads[i].interrupt(); 1159 this.ramCache.clear(); 1160 if (!ioEngine.isPersistent() || persistencePath == null) { 1161 // If persistent ioengine and a path, we will serialize out the backingMap. 1162 this.backingMap.clear(); 1163 } 1164 } 1165 1166 private void join() throws InterruptedException { 1167 for (int i = 0; i < writerThreads.length; ++i) 1168 writerThreads[i].join(); 1169 } 1170 1171 @Override 1172 public void shutdown() { 1173 disableCache(); 1174 LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent() 1175 + "; path to write=" + persistencePath); 1176 if (ioEngine.isPersistent() && persistencePath != null) { 1177 try { 1178 join(); 1179 persistToFile(); 1180 } catch (IOException ex) { 1181 LOG.error("Unable to persist data on exit: " + ex.toString(), ex); 1182 } catch (InterruptedException e) { 1183 LOG.warn("Failed to persist data on exit", e); 1184 } 1185 } 1186 } 1187 1188 @Override 1189 public CacheStats getStats() { 1190 return cacheStats; 1191 } 1192 1193 public BucketAllocator getAllocator() { 1194 return this.bucketAllocator; 1195 } 1196 1197 @Override 1198 public long heapSize() { 1199 return this.heapSize.sum(); 1200 } 1201 1202 @Override 1203 public long size() { 1204 return this.realCacheSize.sum(); 1205 } 1206 1207 @Override 1208 public long getCurrentDataSize() { 1209 return size(); 1210 } 1211 1212 @Override 1213 public long getFreeSize() { 1214 return this.bucketAllocator.getFreeSize(); 1215 } 1216 1217 @Override 1218 public long getBlockCount() { 1219 return this.blockNumber.sum(); 1220 } 1221 1222 @Override 1223 public long getDataBlockCount() { 1224 return getBlockCount(); 1225 } 1226 1227 @Override 1228 public long getCurrentSize() { 1229 return this.bucketAllocator.getUsedSize(); 1230 } 1231 1232 /** 1233 * Evicts all blocks for a specific HFile. 1234 * <p> 1235 * This is used for evict-on-close to remove all blocks of a specific HFile. 1236 * 1237 * @return the number of blocks evicted 1238 */ 1239 @Override 1240 public int evictBlocksByHfileName(String hfileName) { 1241 Set<BlockCacheKey> keySet = blocksByHFile.subSet( 1242 new BlockCacheKey(hfileName, Long.MIN_VALUE), true, 1243 new BlockCacheKey(hfileName, Long.MAX_VALUE), true); 1244 1245 int numEvicted = 0; 1246 for (BlockCacheKey key : keySet) { 1247 if (evictBlock(key)) { 1248 ++numEvicted; 1249 } 1250 } 1251 1252 return numEvicted; 1253 } 1254 1255 /** 1256 * Item in cache. We expect this to be where most memory goes. Java uses 8 1257 * bytes just for object headers; after this, we want to use as little as 1258 * possible - so we only use 8 bytes, but in order to do so we end up messing 1259 * around with all this Java casting stuff. Offset stored as 5 bytes that make 1260 * up the long. Doubt we'll see devices this big for ages. Offsets are divided 1261 * by 256. So 5 bytes gives us 256TB or so. 1262 */ 1263 static class BucketEntry implements Serializable { 1264 private static final long serialVersionUID = -6741504807982257534L; 1265 1266 // access counter comparator, descending order 1267 static final Comparator<BucketEntry> COMPARATOR = new Comparator<BucketCache.BucketEntry>() { 1268 1269 @Override 1270 public int compare(BucketEntry o1, BucketEntry o2) { 1271 return Long.compare(o2.accessCounter, o1.accessCounter); 1272 } 1273 }; 1274 1275 private int offsetBase; 1276 private int length; 1277 private byte offset1; 1278 byte deserialiserIndex; 1279 private volatile long accessCounter; 1280 private BlockPriority priority; 1281 1282 /** 1283 * Time this block was cached. Presumes we are created just before we are added to the cache. 1284 */ 1285 private final long cachedTime = System.nanoTime(); 1286 1287 BucketEntry(long offset, int length, long accessCounter, boolean inMemory) { 1288 setOffset(offset); 1289 this.length = length; 1290 this.accessCounter = accessCounter; 1291 if (inMemory) { 1292 this.priority = BlockPriority.MEMORY; 1293 } else { 1294 this.priority = BlockPriority.SINGLE; 1295 } 1296 } 1297 1298 long offset() { // Java has no unsigned numbers 1299 long o = ((long) offsetBase) & 0xFFFFFFFFL; //This needs the L cast otherwise it will be sign extended as a negative number. 1300 o += (((long) (offset1)) & 0xFF) << 32; //The 0xFF here does not need the L cast because it is treated as a positive int. 1301 return o << 8; 1302 } 1303 1304 private void setOffset(long value) { 1305 assert (value & 0xFF) == 0; 1306 value >>= 8; 1307 offsetBase = (int) value; 1308 offset1 = (byte) (value >> 32); 1309 } 1310 1311 public int getLength() { 1312 return length; 1313 } 1314 1315 protected CacheableDeserializer<Cacheable> deserializerReference( 1316 UniqueIndexMap<Integer> deserialiserMap) { 1317 return CacheableDeserializerIdManager.getDeserializer(deserialiserMap 1318 .unmap(deserialiserIndex)); 1319 } 1320 1321 protected void setDeserialiserReference( 1322 CacheableDeserializer<Cacheable> deserializer, 1323 UniqueIndexMap<Integer> deserialiserMap) { 1324 this.deserialiserIndex = ((byte) deserialiserMap.map(deserializer 1325 .getDeserialiserIdentifier())); 1326 } 1327 1328 /** 1329 * Block has been accessed. Update its local access counter. 1330 */ 1331 public void access(long accessCounter) { 1332 this.accessCounter = accessCounter; 1333 if (this.priority == BlockPriority.SINGLE) { 1334 this.priority = BlockPriority.MULTI; 1335 } 1336 } 1337 1338 public BlockPriority getPriority() { 1339 return this.priority; 1340 } 1341 1342 public long getCachedTime() { 1343 return cachedTime; 1344 } 1345 1346 protected int getRefCount() { 1347 return 0; 1348 } 1349 1350 protected int incrementRefCountAndGet() { 1351 return 0; 1352 } 1353 1354 protected int decrementRefCountAndGet() { 1355 return 0; 1356 } 1357 1358 protected boolean isMarkedForEvict() { 1359 return false; 1360 } 1361 1362 protected void markForEvict() { 1363 // noop; 1364 } 1365 } 1366 1367 static class SharedMemoryBucketEntry extends BucketEntry { 1368 private static final long serialVersionUID = -2187147283772338481L; 1369 1370 // Set this when we were not able to forcefully evict the block 1371 private volatile boolean markedForEvict; 1372 private AtomicInteger refCount = new AtomicInteger(0); 1373 1374 SharedMemoryBucketEntry(long offset, int length, long accessCounter, boolean inMemory) { 1375 super(offset, length, accessCounter, inMemory); 1376 } 1377 1378 @Override 1379 protected int getRefCount() { 1380 return this.refCount.get(); 1381 } 1382 1383 @Override 1384 protected int incrementRefCountAndGet() { 1385 return this.refCount.incrementAndGet(); 1386 } 1387 1388 @Override 1389 protected int decrementRefCountAndGet() { 1390 return this.refCount.decrementAndGet(); 1391 } 1392 1393 @Override 1394 protected boolean isMarkedForEvict() { 1395 return this.markedForEvict; 1396 } 1397 1398 @Override 1399 protected void markForEvict() { 1400 this.markedForEvict = true; 1401 } 1402 } 1403 1404 /** 1405 * Used to group bucket entries into priority buckets. There will be a 1406 * BucketEntryGroup for each priority (single, multi, memory). Once bucketed, 1407 * the eviction algorithm takes the appropriate number of elements out of each 1408 * according to configuration parameters and their relative sizes. 1409 */ 1410 private class BucketEntryGroup { 1411 1412 private CachedEntryQueue queue; 1413 private long totalSize = 0; 1414 private long bucketSize; 1415 1416 public BucketEntryGroup(long bytesToFree, long blockSize, long bucketSize) { 1417 this.bucketSize = bucketSize; 1418 queue = new CachedEntryQueue(bytesToFree, blockSize); 1419 totalSize = 0; 1420 } 1421 1422 public void add(Map.Entry<BlockCacheKey, BucketEntry> block) { 1423 totalSize += block.getValue().getLength(); 1424 queue.add(block); 1425 } 1426 1427 public long free(long toFree) { 1428 Map.Entry<BlockCacheKey, BucketEntry> entry; 1429 long freedBytes = 0; 1430 // TODO avoid a cycling siutation. We find no block which is not in use and so no way to free 1431 // What to do then? Caching attempt fail? Need some changes in cacheBlock API? 1432 while ((entry = queue.pollLast()) != null) { 1433 if (evictBlock(entry.getKey(), false)) { 1434 freedBytes += entry.getValue().getLength(); 1435 } 1436 if (freedBytes >= toFree) { 1437 return freedBytes; 1438 } 1439 } 1440 return freedBytes; 1441 } 1442 1443 public long overflow() { 1444 return totalSize - bucketSize; 1445 } 1446 1447 public long totalSize() { 1448 return totalSize; 1449 } 1450 } 1451 1452 /** 1453 * Block Entry stored in the memory with key,data and so on 1454 */ 1455 @VisibleForTesting 1456 static class RAMQueueEntry { 1457 private BlockCacheKey key; 1458 private Cacheable data; 1459 private long accessCounter; 1460 private boolean inMemory; 1461 1462 public RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, 1463 boolean inMemory) { 1464 this.key = bck; 1465 this.data = data; 1466 this.accessCounter = accessCounter; 1467 this.inMemory = inMemory; 1468 } 1469 1470 public Cacheable getData() { 1471 return data; 1472 } 1473 1474 public BlockCacheKey getKey() { 1475 return key; 1476 } 1477 1478 public void access(long accessCounter) { 1479 this.accessCounter = accessCounter; 1480 } 1481 1482 private BucketEntry getBucketEntry(IOEngine ioEngine, long offset, int len) { 1483 if (ioEngine.usesSharedMemory()) { 1484 if (UnsafeAvailChecker.isAvailable()) { 1485 return new UnsafeSharedMemoryBucketEntry(offset, len, accessCounter, inMemory); 1486 } else { 1487 return new SharedMemoryBucketEntry(offset, len, accessCounter, inMemory); 1488 } 1489 } else { 1490 return new BucketEntry(offset, len, accessCounter, inMemory); 1491 } 1492 } 1493 1494 public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator bucketAllocator, 1495 final UniqueIndexMap<Integer> deserialiserMap, final LongAdder realCacheSize) 1496 throws IOException { 1497 int len = data.getSerializedLength(); 1498 // This cacheable thing can't be serialized 1499 if (len == 0) { 1500 return null; 1501 } 1502 long offset = bucketAllocator.allocateBlock(len); 1503 boolean succ = false; 1504 BucketEntry bucketEntry; 1505 try { 1506 bucketEntry = getBucketEntry(ioEngine, offset, len); 1507 bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap); 1508 if (data instanceof HFileBlock) { 1509 // If an instance of HFileBlock, save on some allocations. 1510 HFileBlock block = (HFileBlock) data; 1511 ByteBuff sliceBuf = block.getBufferReadOnly(); 1512 ByteBuffer metadata = block.getMetaData(); 1513 ioEngine.write(sliceBuf, offset); 1514 ioEngine.write(metadata, offset + len - metadata.limit()); 1515 } else { 1516 ByteBuffer bb = ByteBuffer.allocate(len); 1517 data.serialize(bb, true); 1518 ioEngine.write(bb, offset); 1519 } 1520 succ = true; 1521 } finally { 1522 if (!succ) { 1523 bucketAllocator.freeBlock(offset); 1524 } 1525 } 1526 realCacheSize.add(len); 1527 return bucketEntry; 1528 } 1529 } 1530 1531 /** 1532 * Only used in test 1533 * @throws InterruptedException 1534 */ 1535 void stopWriterThreads() throws InterruptedException { 1536 for (WriterThread writerThread : writerThreads) { 1537 writerThread.disableWriter(); 1538 writerThread.interrupt(); 1539 writerThread.join(); 1540 } 1541 } 1542 1543 @Override 1544 public Iterator<CachedBlock> iterator() { 1545 // Don't bother with ramcache since stuff is in here only a little while. 1546 final Iterator<Map.Entry<BlockCacheKey, BucketEntry>> i = 1547 this.backingMap.entrySet().iterator(); 1548 return new Iterator<CachedBlock>() { 1549 private final long now = System.nanoTime(); 1550 1551 @Override 1552 public boolean hasNext() { 1553 return i.hasNext(); 1554 } 1555 1556 @Override 1557 public CachedBlock next() { 1558 final Map.Entry<BlockCacheKey, BucketEntry> e = i.next(); 1559 return new CachedBlock() { 1560 @Override 1561 public String toString() { 1562 return BlockCacheUtil.toString(this, now); 1563 } 1564 1565 @Override 1566 public BlockPriority getBlockPriority() { 1567 return e.getValue().getPriority(); 1568 } 1569 1570 @Override 1571 public BlockType getBlockType() { 1572 // Not held by BucketEntry. Could add it if wanted on BucketEntry creation. 1573 return null; 1574 } 1575 1576 @Override 1577 public long getOffset() { 1578 return e.getKey().getOffset(); 1579 } 1580 1581 @Override 1582 public long getSize() { 1583 return e.getValue().getLength(); 1584 } 1585 1586 @Override 1587 public long getCachedTime() { 1588 return e.getValue().getCachedTime(); 1589 } 1590 1591 @Override 1592 public String getFilename() { 1593 return e.getKey().getHfileName(); 1594 } 1595 1596 @Override 1597 public int compareTo(CachedBlock other) { 1598 int diff = this.getFilename().compareTo(other.getFilename()); 1599 if (diff != 0) return diff; 1600 1601 diff = Long.compare(this.getOffset(), other.getOffset()); 1602 if (diff != 0) return diff; 1603 if (other.getCachedTime() < 0 || this.getCachedTime() < 0) { 1604 throw new IllegalStateException("" + this.getCachedTime() + ", " + 1605 other.getCachedTime()); 1606 } 1607 return Long.compare(other.getCachedTime(), this.getCachedTime()); 1608 } 1609 1610 @Override 1611 public int hashCode() { 1612 return e.getKey().hashCode(); 1613 } 1614 1615 @Override 1616 public boolean equals(Object obj) { 1617 if (obj instanceof CachedBlock) { 1618 CachedBlock cb = (CachedBlock)obj; 1619 return compareTo(cb) == 0; 1620 } else { 1621 return false; 1622 } 1623 } 1624 }; 1625 } 1626 1627 @Override 1628 public void remove() { 1629 throw new UnsupportedOperationException(); 1630 } 1631 }; 1632 } 1633 1634 @Override 1635 public BlockCache[] getBlockCaches() { 1636 return null; 1637 } 1638 1639 @Override 1640 public void returnBlock(BlockCacheKey cacheKey, Cacheable block) { 1641 if (block.getMemoryType() == MemoryType.SHARED) { 1642 BucketEntry bucketEntry = backingMap.get(cacheKey); 1643 if (bucketEntry != null) { 1644 int refCount = bucketEntry.decrementRefCountAndGet(); 1645 if (refCount == 0 && bucketEntry.isMarkedForEvict()) { 1646 evictBlock(cacheKey); 1647 } 1648 } 1649 } 1650 } 1651 1652 @VisibleForTesting 1653 public int getRefCount(BlockCacheKey cacheKey) { 1654 BucketEntry bucketEntry = backingMap.get(cacheKey); 1655 if (bucketEntry != null) { 1656 return bucketEntry.getRefCount(); 1657 } 1658 return 0; 1659 } 1660 1661 float getAcceptableFactor() { 1662 return acceptableFactor; 1663 } 1664 1665 float getMinFactor() { 1666 return minFactor; 1667 } 1668 1669 float getExtraFreeFactor() { 1670 return extraFreeFactor; 1671 } 1672 1673 float getSingleFactor() { 1674 return singleFactor; 1675 } 1676 1677 float getMultiFactor() { 1678 return multiFactor; 1679 } 1680 1681 float getMemoryFactor() { 1682 return memoryFactor; 1683 } 1684}