001/** 002 * Copyright The Apache Software Foundation 003 * 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021package org.apache.hadoop.hbase.io.hfile.bucket; 022 023import java.io.File; 024import java.io.FileInputStream; 025import java.io.FileOutputStream; 026import java.io.IOException; 027import java.nio.ByteBuffer; 028import java.util.ArrayList; 029import java.util.Comparator; 030import java.util.HashSet; 031import java.util.Iterator; 032import java.util.List; 033import java.util.Map; 034import java.util.NavigableSet; 035import java.util.PriorityQueue; 036import java.util.Set; 037import java.util.concurrent.ArrayBlockingQueue; 038import java.util.concurrent.BlockingQueue; 039import java.util.concurrent.ConcurrentHashMap; 040import java.util.concurrent.ConcurrentMap; 041import java.util.concurrent.ConcurrentSkipListSet; 042import java.util.concurrent.Executors; 043import java.util.concurrent.ScheduledExecutorService; 044import java.util.concurrent.TimeUnit; 045import java.util.concurrent.atomic.AtomicBoolean; 046import java.util.concurrent.atomic.AtomicLong; 047import java.util.concurrent.atomic.LongAdder; 048import java.util.concurrent.locks.Lock; 049import java.util.concurrent.locks.ReentrantLock; 050import java.util.concurrent.locks.ReentrantReadWriteLock; 051import java.util.function.Consumer; 052 053import org.apache.hadoop.conf.Configuration; 054import org.apache.hadoop.hbase.HBaseConfiguration; 055import org.apache.hadoop.hbase.TableName; 056import org.apache.hadoop.hbase.client.Admin; 057import org.apache.hadoop.hbase.io.ByteBuffAllocator; 058import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; 059import org.apache.hadoop.hbase.io.HeapSize; 060import org.apache.hadoop.hbase.io.hfile.BlockCache; 061import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 062import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil; 063import org.apache.hadoop.hbase.io.hfile.BlockPriority; 064import org.apache.hadoop.hbase.io.hfile.BlockType; 065import org.apache.hadoop.hbase.io.hfile.CacheStats; 066import org.apache.hadoop.hbase.io.hfile.Cacheable; 067import org.apache.hadoop.hbase.io.hfile.CachedBlock; 068import org.apache.hadoop.hbase.io.hfile.HFileBlock; 069import org.apache.hadoop.hbase.nio.ByteBuff; 070import org.apache.hadoop.hbase.nio.RefCnt; 071import org.apache.hadoop.hbase.protobuf.ProtobufMagic; 072import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 073import org.apache.hadoop.hbase.util.IdReadWriteLock; 074import org.apache.hadoop.hbase.util.IdReadWriteLock.ReferenceType; 075import org.apache.hadoop.util.StringUtils; 076import org.apache.yetus.audience.InterfaceAudience; 077import org.slf4j.Logger; 078import org.slf4j.LoggerFactory; 079 080import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 081import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 082import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 083 084import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos; 085 086/** 087 * BucketCache uses {@link BucketAllocator} to allocate/free blocks, and uses 088 * BucketCache#ramCache and BucketCache#backingMap in order to 089 * determine if a given element is in the cache. The bucket cache can use on-heap or 090 * off-heap memory {@link ByteBufferIOEngine} or in a file {@link FileIOEngine} to 091 * store/read the block data. 092 * 093 * <p>Eviction is via a similar algorithm as used in 094 * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache} 095 * 096 * <p>BucketCache can be used as mainly a block cache (see 097 * {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with 098 * a BlockCache to decrease CMS GC and heap fragmentation. 099 * 100 * <p>It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store 101 * blocks) to enlarge cache space via a victim cache. 102 */ 103@InterfaceAudience.Private 104public class BucketCache implements BlockCache, HeapSize { 105 private static final Logger LOG = LoggerFactory.getLogger(BucketCache.class); 106 107 /** Priority buckets config */ 108 static final String SINGLE_FACTOR_CONFIG_NAME = "hbase.bucketcache.single.factor"; 109 static final String MULTI_FACTOR_CONFIG_NAME = "hbase.bucketcache.multi.factor"; 110 static final String MEMORY_FACTOR_CONFIG_NAME = "hbase.bucketcache.memory.factor"; 111 static final String EXTRA_FREE_FACTOR_CONFIG_NAME = "hbase.bucketcache.extrafreefactor"; 112 static final String ACCEPT_FACTOR_CONFIG_NAME = "hbase.bucketcache.acceptfactor"; 113 static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor"; 114 115 /** Priority buckets */ 116 @VisibleForTesting 117 static final float DEFAULT_SINGLE_FACTOR = 0.25f; 118 static final float DEFAULT_MULTI_FACTOR = 0.50f; 119 static final float DEFAULT_MEMORY_FACTOR = 0.25f; 120 static final float DEFAULT_MIN_FACTOR = 0.85f; 121 122 private static final float DEFAULT_EXTRA_FREE_FACTOR = 0.10f; 123 private static final float DEFAULT_ACCEPT_FACTOR = 0.95f; 124 125 // Number of blocks to clear for each of the bucket size that is full 126 private static final int DEFAULT_FREE_ENTIRE_BLOCK_FACTOR = 2; 127 128 /** Statistics thread */ 129 private static final int statThreadPeriod = 5 * 60; 130 131 final static int DEFAULT_WRITER_THREADS = 3; 132 final static int DEFAULT_WRITER_QUEUE_ITEMS = 64; 133 134 // Store/read block data 135 transient final IOEngine ioEngine; 136 137 // Store the block in this map before writing it to cache 138 @VisibleForTesting 139 transient final RAMCache ramCache; 140 // In this map, store the block's meta data like offset, length 141 @VisibleForTesting 142 transient ConcurrentHashMap<BlockCacheKey, BucketEntry> backingMap; 143 144 /** 145 * Flag if the cache is enabled or not... We shut it off if there are IO 146 * errors for some time, so that Bucket IO exceptions/errors don't bring down 147 * the HBase server. 148 */ 149 private volatile boolean cacheEnabled; 150 151 /** 152 * A list of writer queues. We have a queue per {@link WriterThread} we have running. 153 * In other words, the work adding blocks to the BucketCache is divided up amongst the 154 * running WriterThreads. Its done by taking hash of the cache key modulo queue count. 155 * WriterThread when it runs takes whatever has been recently added and 'drains' the entries 156 * to the BucketCache. It then updates the ramCache and backingMap accordingly. 157 */ 158 @VisibleForTesting 159 transient final ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues = new ArrayList<>(); 160 @VisibleForTesting 161 transient final WriterThread[] writerThreads; 162 163 /** Volatile boolean to track if free space is in process or not */ 164 private volatile boolean freeInProgress = false; 165 private transient final Lock freeSpaceLock = new ReentrantLock(); 166 167 private final LongAdder realCacheSize = new LongAdder(); 168 private final LongAdder heapSize = new LongAdder(); 169 /** Current number of cached elements */ 170 private final LongAdder blockNumber = new LongAdder(); 171 172 /** Cache access count (sequential ID) */ 173 private final AtomicLong accessCount = new AtomicLong(); 174 175 private static final int DEFAULT_CACHE_WAIT_TIME = 50; 176 177 /** 178 * Used in tests. If this flag is false and the cache speed is very fast, 179 * bucket cache will skip some blocks when caching. If the flag is true, we 180 * will wait until blocks are flushed to IOEngine. 181 */ 182 @VisibleForTesting 183 boolean wait_when_cache = false; 184 185 private final BucketCacheStats cacheStats = new BucketCacheStats(); 186 187 private final String persistencePath; 188 private final long cacheCapacity; 189 /** Approximate block size */ 190 private final long blockSize; 191 192 /** Duration of IO errors tolerated before we disable cache, 1 min as default */ 193 private final int ioErrorsTolerationDuration; 194 // 1 min 195 public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000; 196 197 // Start time of first IO error when reading or writing IO Engine, it will be 198 // reset after a successful read/write. 199 private volatile long ioErrorStartTime = -1; 200 201 /** 202 * A ReentrantReadWriteLock to lock on a particular block identified by offset. 203 * The purpose of this is to avoid freeing the block which is being read. 204 * <p> 205 * Key set of offsets in BucketCache is limited so soft reference is the best choice here. 206 */ 207 @VisibleForTesting 208 transient final IdReadWriteLock<Long> offsetLock = new IdReadWriteLock<>(ReferenceType.SOFT); 209 210 private final NavigableSet<BlockCacheKey> blocksByHFile = new ConcurrentSkipListSet<>((a, b) -> { 211 int nameComparison = a.getHfileName().compareTo(b.getHfileName()); 212 if (nameComparison != 0) { 213 return nameComparison; 214 } 215 return Long.compare(a.getOffset(), b.getOffset()); 216 }); 217 218 /** Statistics thread schedule pool (for heavy debugging, could remove) */ 219 private transient final ScheduledExecutorService scheduleThreadPool = 220 Executors.newScheduledThreadPool(1, 221 new ThreadFactoryBuilder().setNameFormat("BucketCacheStatsExecutor").setDaemon(true).build()); 222 223 // Allocate or free space for the block 224 private transient BucketAllocator bucketAllocator; 225 226 /** Acceptable size of cache (no evictions if size < acceptable) */ 227 private float acceptableFactor; 228 229 /** Minimum threshold of cache (when evicting, evict until size < min) */ 230 private float minFactor; 231 232 /** Free this floating point factor of extra blocks when evicting. For example free the number of blocks requested * (1 + extraFreeFactor) */ 233 private float extraFreeFactor; 234 235 /** Single access bucket size */ 236 private float singleFactor; 237 238 /** Multiple access bucket size */ 239 private float multiFactor; 240 241 /** In-memory bucket size */ 242 private float memoryFactor; 243 244 private static final String FILE_VERIFY_ALGORITHM = 245 "hbase.bucketcache.persistent.file.integrity.check.algorithm"; 246 private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5"; 247 248 /** 249 * Use {@link java.security.MessageDigest} class's encryption algorithms to check 250 * persistent file integrity, default algorithm is MD5 251 * */ 252 private String algorithm; 253 254 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 255 int writerThreadNum, int writerQLen, String persistencePath) throws IOException { 256 this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, 257 persistencePath, DEFAULT_ERROR_TOLERATION_DURATION, HBaseConfiguration.create()); 258 } 259 260 public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 261 int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration, 262 Configuration conf) throws IOException { 263 this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, DEFAULT_FILE_VERIFY_ALGORITHM); 264 this.ioEngine = getIOEngineFromName(ioEngineName, capacity, persistencePath); 265 this.writerThreads = new WriterThread[writerThreadNum]; 266 long blockNumCapacity = capacity / blockSize; 267 if (blockNumCapacity >= Integer.MAX_VALUE) { 268 // Enough for about 32TB of cache! 269 throw new IllegalArgumentException("Cache capacity is too large, only support 32TB now"); 270 } 271 272 this.acceptableFactor = conf.getFloat(ACCEPT_FACTOR_CONFIG_NAME, DEFAULT_ACCEPT_FACTOR); 273 this.minFactor = conf.getFloat(MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR); 274 this.extraFreeFactor = conf.getFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, DEFAULT_EXTRA_FREE_FACTOR); 275 this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR); 276 this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR); 277 this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR); 278 279 sanityCheckConfigs(); 280 281 LOG.info("Instantiating BucketCache with acceptableFactor: " + acceptableFactor + ", minFactor: " + minFactor + 282 ", extraFreeFactor: " + extraFreeFactor + ", singleFactor: " + singleFactor + ", multiFactor: " + multiFactor + 283 ", memoryFactor: " + memoryFactor); 284 285 this.cacheCapacity = capacity; 286 this.persistencePath = persistencePath; 287 this.blockSize = blockSize; 288 this.ioErrorsTolerationDuration = ioErrorsTolerationDuration; 289 290 bucketAllocator = new BucketAllocator(capacity, bucketSizes); 291 for (int i = 0; i < writerThreads.length; ++i) { 292 writerQueues.add(new ArrayBlockingQueue<>(writerQLen)); 293 } 294 295 assert writerQueues.size() == writerThreads.length; 296 this.ramCache = new RAMCache(); 297 298 this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity); 299 300 if (ioEngine.isPersistent() && persistencePath != null) { 301 try { 302 retrieveFromFile(bucketSizes); 303 } catch (IOException ioex) { 304 LOG.error("Can't restore from file[" + persistencePath + "] because of ", ioex); 305 } 306 } 307 final String threadName = Thread.currentThread().getName(); 308 this.cacheEnabled = true; 309 for (int i = 0; i < writerThreads.length; ++i) { 310 writerThreads[i] = new WriterThread(writerQueues.get(i)); 311 writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i); 312 writerThreads[i].setDaemon(true); 313 } 314 startWriterThreads(); 315 316 // Run the statistics thread periodically to print the cache statistics log 317 // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log 318 // every five minutes. 319 this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), 320 statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS); 321 LOG.info("Started bucket cache; ioengine=" + ioEngineName + 322 ", capacity=" + StringUtils.byteDesc(capacity) + 323 ", blockSize=" + StringUtils.byteDesc(blockSize) + ", writerThreadNum=" + 324 writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath=" + 325 persistencePath + ", bucketAllocator=" + this.bucketAllocator.getClass().getName()); 326 } 327 328 private void sanityCheckConfigs() { 329 Preconditions.checkArgument(acceptableFactor <= 1 && acceptableFactor >= 0, ACCEPT_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 330 Preconditions.checkArgument(minFactor <= 1 && minFactor >= 0, MIN_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 331 Preconditions.checkArgument(minFactor <= acceptableFactor, MIN_FACTOR_CONFIG_NAME + " must be <= " + ACCEPT_FACTOR_CONFIG_NAME); 332 Preconditions.checkArgument(extraFreeFactor >= 0, EXTRA_FREE_FACTOR_CONFIG_NAME + " must be greater than 0.0"); 333 Preconditions.checkArgument(singleFactor <= 1 && singleFactor >= 0, SINGLE_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 334 Preconditions.checkArgument(multiFactor <= 1 && multiFactor >= 0, MULTI_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 335 Preconditions.checkArgument(memoryFactor <= 1 && memoryFactor >= 0, MEMORY_FACTOR_CONFIG_NAME + " must be between 0.0 and 1.0"); 336 Preconditions.checkArgument((singleFactor + multiFactor + memoryFactor) == 1, SINGLE_FACTOR_CONFIG_NAME + ", " + 337 MULTI_FACTOR_CONFIG_NAME + ", and " + MEMORY_FACTOR_CONFIG_NAME + " segments must add up to 1.0"); 338 } 339 340 /** 341 * Called by the constructor to start the writer threads. Used by tests that need to override 342 * starting the threads. 343 */ 344 @VisibleForTesting 345 protected void startWriterThreads() { 346 for (WriterThread thread : writerThreads) { 347 thread.start(); 348 } 349 } 350 351 @VisibleForTesting 352 boolean isCacheEnabled() { 353 return this.cacheEnabled; 354 } 355 356 @Override 357 public long getMaxSize() { 358 return this.cacheCapacity; 359 } 360 361 public String getIoEngine() { 362 return ioEngine.toString(); 363 } 364 365 /** 366 * Get the IOEngine from the IO engine name 367 * @param ioEngineName 368 * @param capacity 369 * @param persistencePath 370 * @return the IOEngine 371 * @throws IOException 372 */ 373 private IOEngine getIOEngineFromName(String ioEngineName, long capacity, String persistencePath) 374 throws IOException { 375 if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) { 376 // In order to make the usage simple, we only need the prefix 'files:' in 377 // document whether one or multiple file(s), but also support 'file:' for 378 // the compatibility 379 String[] filePaths = ioEngineName.substring(ioEngineName.indexOf(":") + 1) 380 .split(FileIOEngine.FILE_DELIMITER); 381 return new FileIOEngine(capacity, persistencePath != null, filePaths); 382 } else if (ioEngineName.startsWith("offheap")) { 383 return new ByteBufferIOEngine(capacity); 384 } else if (ioEngineName.startsWith("mmap:")) { 385 return new ExclusiveMemoryMmapIOEngine(ioEngineName.substring(5), capacity); 386 } else if (ioEngineName.startsWith("pmem:")) { 387 // This mode of bucket cache creates an IOEngine over a file on the persistent memory 388 // device. Since the persistent memory device has its own address space the contents 389 // mapped to this address space does not get swapped out like in the case of mmapping 390 // on to DRAM. Hence the cells created out of the hfile blocks in the pmem bucket cache 391 // can be directly referred to without having to copy them onheap. Once the RPC is done, 392 // the blocks can be returned back as in case of ByteBufferIOEngine. 393 return new SharedMemoryMmapIOEngine(ioEngineName.substring(5), capacity); 394 } else { 395 throw new IllegalArgumentException( 396 "Don't understand io engine name for cache- prefix with file:, files:, mmap: or offheap"); 397 } 398 } 399 400 /** 401 * Cache the block with the specified name and buffer. 402 * @param cacheKey block's cache key 403 * @param buf block buffer 404 */ 405 @Override 406 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { 407 cacheBlock(cacheKey, buf, false); 408 } 409 410 /** 411 * Cache the block with the specified name and buffer. 412 * @param cacheKey block's cache key 413 * @param cachedItem block buffer 414 * @param inMemory if block is in-memory 415 */ 416 @Override 417 public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory) { 418 cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache); 419 } 420 421 /** 422 * Cache the block to ramCache 423 * @param cacheKey block's cache key 424 * @param cachedItem block buffer 425 * @param inMemory if block is in-memory 426 * @param wait if true, blocking wait when queue is full 427 */ 428 public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, 429 boolean wait) { 430 if (cacheEnabled) { 431 if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) { 432 if (BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, cachedItem)) { 433 cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); 434 } 435 } else { 436 cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); 437 } 438 } 439 } 440 441 private void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem, 442 boolean inMemory, boolean wait) { 443 if (!cacheEnabled) { 444 return; 445 } 446 LOG.trace("Caching key={}, item={}", cacheKey, cachedItem); 447 // Stuff the entry into the RAM cache so it can get drained to the persistent store 448 RAMQueueEntry re = 449 new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory, 450 createRecycler(cacheKey)); 451 /** 452 * Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same 453 * key in ramCache, the heap size of bucket cache need to update if replacing entry from 454 * ramCache. But WriterThread will also remove entry from ramCache and update heap size, if 455 * using ramCache.put(), It's possible that the removed entry in WriterThread is not the correct 456 * one, then the heap size will mess up (HBASE-20789) 457 */ 458 if (ramCache.putIfAbsent(cacheKey, re) != null) { 459 return; 460 } 461 int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size(); 462 BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum); 463 boolean successfulAddition = false; 464 if (wait) { 465 try { 466 successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS); 467 } catch (InterruptedException e) { 468 Thread.currentThread().interrupt(); 469 } 470 } else { 471 successfulAddition = bq.offer(re); 472 } 473 if (!successfulAddition) { 474 ramCache.remove(cacheKey); 475 cacheStats.failInsert(); 476 } else { 477 this.blockNumber.increment(); 478 this.heapSize.add(cachedItem.heapSize()); 479 blocksByHFile.add(cacheKey); 480 } 481 } 482 483 /** 484 * Get the buffer of the block with the specified key. 485 * @param key block's cache key 486 * @param caching true if the caller caches blocks on cache misses 487 * @param repeat Whether this is a repeat lookup for the same block 488 * @param updateCacheMetrics Whether we should update cache metrics or not 489 * @return buffer of specified cache key, or null if not in cache 490 */ 491 @Override 492 public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, 493 boolean updateCacheMetrics) { 494 if (!cacheEnabled) { 495 return null; 496 } 497 RAMQueueEntry re = ramCache.get(key); 498 if (re != null) { 499 if (updateCacheMetrics) { 500 cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); 501 } 502 re.access(accessCount.incrementAndGet()); 503 return re.getData(); 504 } 505 BucketEntry bucketEntry = backingMap.get(key); 506 if (bucketEntry != null) { 507 long start = System.nanoTime(); 508 ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); 509 try { 510 lock.readLock().lock(); 511 // We can not read here even if backingMap does contain the given key because its offset 512 // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check 513 // existence here. 514 if (bucketEntry.equals(backingMap.get(key))) { 515 // Read the block from IOEngine based on the bucketEntry's offset and length, NOTICE: the 516 // block will use the refCnt of bucketEntry, which means if two HFileBlock mapping to 517 // the same BucketEntry, then all of the three will share the same refCnt. 518 Cacheable cachedBlock = ioEngine.read(bucketEntry); 519 if (ioEngine.usesSharedMemory()) { 520 // If IOEngine use shared memory, cachedBlock and BucketEntry will share the 521 // same RefCnt, do retain here, in order to count the number of RPC references 522 cachedBlock.retain(); 523 } 524 // Update the cache statistics. 525 if (updateCacheMetrics) { 526 cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); 527 cacheStats.ioHit(System.nanoTime() - start); 528 } 529 bucketEntry.access(accessCount.incrementAndGet()); 530 if (this.ioErrorStartTime > 0) { 531 ioErrorStartTime = -1; 532 } 533 return cachedBlock; 534 } 535 } catch (IOException ioex) { 536 LOG.error("Failed reading block " + key + " from bucket cache", ioex); 537 checkIOErrorIsTolerated(); 538 } finally { 539 lock.readLock().unlock(); 540 } 541 } 542 if (!repeat && updateCacheMetrics) { 543 cacheStats.miss(caching, key.isPrimary(), key.getBlockType()); 544 } 545 return null; 546 } 547 548 @VisibleForTesting 549 void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber) { 550 bucketAllocator.freeBlock(bucketEntry.offset()); 551 realCacheSize.add(-1 * bucketEntry.getLength()); 552 blocksByHFile.remove(cacheKey); 553 if (decrementBlockNumber) { 554 this.blockNumber.decrement(); 555 } 556 } 557 558 /** 559 * Try to evict the block from {@link BlockCache} by force. We'll call this in few cases:<br> 560 * 1. Close an HFile, and clear all cached blocks. <br> 561 * 2. Call {@link Admin#clearBlockCache(TableName)} to clear all blocks for a given table.<br> 562 * <p> 563 * Firstly, we'll try to remove the block from RAMCache. If it doesn't exist in RAMCache, then try 564 * to evict from backingMap. Here we only need to free the reference from bucket cache by calling 565 * {@link BucketEntry#markedAsEvicted}. If there're still some RPC referring this block, block can 566 * only be de-allocated when all of them release the block. 567 * <p> 568 * NOTICE: we need to grab the write offset lock firstly before releasing the reference from 569 * bucket cache. if we don't, we may read an {@link BucketEntry} with refCnt = 0 when 570 * {@link BucketCache#getBlock(BlockCacheKey, boolean, boolean, boolean)}, it's a memory leak. 571 * @param cacheKey Block to evict 572 * @return true to indicate whether we've evicted successfully or not. 573 */ 574 @Override 575 public boolean evictBlock(BlockCacheKey cacheKey) { 576 if (!cacheEnabled) { 577 return false; 578 } 579 boolean existed = removeFromRamCache(cacheKey); 580 BucketEntry be = backingMap.get(cacheKey); 581 if (be == null) { 582 if (existed) { 583 cacheStats.evicted(0, cacheKey.isPrimary()); 584 } 585 return existed; 586 } else { 587 return be.withWriteLock(offsetLock, be::markAsEvicted); 588 } 589 } 590 591 private Recycler createRecycler(BlockCacheKey cacheKey) { 592 return () -> { 593 if (!cacheEnabled) { 594 return; 595 } 596 boolean existed = removeFromRamCache(cacheKey); 597 BucketEntry be = backingMap.get(cacheKey); 598 if (be == null && existed) { 599 cacheStats.evicted(0, cacheKey.isPrimary()); 600 } else if (be != null) { 601 be.withWriteLock(offsetLock, () -> { 602 if (backingMap.remove(cacheKey, be)) { 603 blockEvicted(cacheKey, be, !existed); 604 cacheStats.evicted(be.getCachedTime(), cacheKey.isPrimary()); 605 } 606 return null; 607 }); 608 } 609 }; 610 } 611 612 private boolean removeFromRamCache(BlockCacheKey cacheKey) { 613 return ramCache.remove(cacheKey, re -> { 614 if (re != null) { 615 this.blockNumber.decrement(); 616 this.heapSize.add(-1 * re.getData().heapSize()); 617 } 618 }); 619 } 620 621 /* 622 * Statistics thread. Periodically output cache statistics to the log. 623 */ 624 private static class StatisticsThread extends Thread { 625 private final BucketCache bucketCache; 626 627 public StatisticsThread(BucketCache bucketCache) { 628 super("BucketCacheStatsThread"); 629 setDaemon(true); 630 this.bucketCache = bucketCache; 631 } 632 633 @Override 634 public void run() { 635 bucketCache.logStats(); 636 } 637 } 638 639 public void logStats() { 640 long totalSize = bucketAllocator.getTotalSize(); 641 long usedSize = bucketAllocator.getUsedSize(); 642 long freeSize = totalSize - usedSize; 643 long cacheSize = getRealCacheSize(); 644 LOG.info("failedBlockAdditions=" + cacheStats.getFailedInserts() + ", " + 645 "totalSize=" + StringUtils.byteDesc(totalSize) + ", " + 646 "freeSize=" + StringUtils.byteDesc(freeSize) + ", " + 647 "usedSize=" + StringUtils.byteDesc(usedSize) +", " + 648 "cacheSize=" + StringUtils.byteDesc(cacheSize) +", " + 649 "accesses=" + cacheStats.getRequestCount() + ", " + 650 "hits=" + cacheStats.getHitCount() + ", " + 651 "IOhitsPerSecond=" + cacheStats.getIOHitsPerSecond() + ", " + 652 "IOTimePerHit=" + String.format("%.2f", cacheStats.getIOTimePerHit())+ ", " + 653 "hitRatio=" + (cacheStats.getHitCount() == 0 ? "0," : 654 (StringUtils.formatPercent(cacheStats.getHitRatio(), 2)+ ", ")) + 655 "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " + 656 "cachingHits=" + cacheStats.getHitCachingCount() + ", " + 657 "cachingHitsRatio=" +(cacheStats.getHitCachingCount() == 0 ? "0," : 658 (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2)+ ", ")) + 659 "evictions=" + cacheStats.getEvictionCount() + ", " + 660 "evicted=" + cacheStats.getEvictedCount() + ", " + 661 "evictedPerRun=" + cacheStats.evictedPerEviction()); 662 cacheStats.reset(); 663 } 664 665 public long getRealCacheSize() { 666 return this.realCacheSize.sum(); 667 } 668 669 public long acceptableSize() { 670 return (long) Math.floor(bucketAllocator.getTotalSize() * acceptableFactor); 671 } 672 673 @VisibleForTesting 674 long getPartitionSize(float partitionFactor) { 675 return (long) Math.floor(bucketAllocator.getTotalSize() * partitionFactor * minFactor); 676 } 677 678 /** 679 * Return the count of bucketSizeinfos still need free space 680 */ 681 private int bucketSizesAboveThresholdCount(float minFactor) { 682 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); 683 int fullCount = 0; 684 for (int i = 0; i < stats.length; i++) { 685 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor)); 686 freeGoal = Math.max(freeGoal, 1); 687 if (stats[i].freeCount() < freeGoal) { 688 fullCount++; 689 } 690 } 691 return fullCount; 692 } 693 694 /** 695 * This method will find the buckets that are minimally occupied 696 * and are not reference counted and will free them completely 697 * without any constraint on the access times of the elements, 698 * and as a process will completely free at most the number of buckets 699 * passed, sometimes it might not due to changing refCounts 700 * 701 * @param completelyFreeBucketsNeeded number of buckets to free 702 **/ 703 private void freeEntireBuckets(int completelyFreeBucketsNeeded) { 704 if (completelyFreeBucketsNeeded != 0) { 705 // First we will build a set where the offsets are reference counted, usually 706 // this set is small around O(Handler Count) unless something else is wrong 707 Set<Integer> inUseBuckets = new HashSet<>(); 708 backingMap.forEach((k, be) -> { 709 if (be.isRpcRef()) { 710 inUseBuckets.add(bucketAllocator.getBucketIndex(be.offset())); 711 } 712 }); 713 Set<Integer> candidateBuckets = 714 bucketAllocator.getLeastFilledBuckets(inUseBuckets, completelyFreeBucketsNeeded); 715 for (Map.Entry<BlockCacheKey, BucketEntry> entry : backingMap.entrySet()) { 716 if (candidateBuckets.contains(bucketAllocator.getBucketIndex(entry.getValue().offset()))) { 717 entry.getValue().withWriteLock(offsetLock, entry.getValue()::markStaleAsEvicted); 718 } 719 } 720 } 721 } 722 723 /** 724 * Free the space if the used size reaches acceptableSize() or one size block 725 * couldn't be allocated. When freeing the space, we use the LRU algorithm and 726 * ensure there must be some blocks evicted 727 * @param why Why we are being called 728 */ 729 private void freeSpace(final String why) { 730 // Ensure only one freeSpace progress at a time 731 if (!freeSpaceLock.tryLock()) { 732 return; 733 } 734 try { 735 freeInProgress = true; 736 long bytesToFreeWithoutExtra = 0; 737 // Calculate free byte for each bucketSizeinfo 738 StringBuilder msgBuffer = LOG.isDebugEnabled()? new StringBuilder(): null; 739 BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics(); 740 long[] bytesToFreeForBucket = new long[stats.length]; 741 for (int i = 0; i < stats.length; i++) { 742 bytesToFreeForBucket[i] = 0; 743 long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - minFactor)); 744 freeGoal = Math.max(freeGoal, 1); 745 if (stats[i].freeCount() < freeGoal) { 746 bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount()); 747 bytesToFreeWithoutExtra += bytesToFreeForBucket[i]; 748 if (msgBuffer != null) { 749 msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")=" 750 + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", "); 751 } 752 } 753 } 754 if (msgBuffer != null) { 755 msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", "); 756 } 757 758 if (bytesToFreeWithoutExtra <= 0) { 759 return; 760 } 761 long currentSize = bucketAllocator.getUsedSize(); 762 long totalSize = bucketAllocator.getTotalSize(); 763 if (LOG.isDebugEnabled() && msgBuffer != null) { 764 LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString() + 765 " of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" + 766 StringUtils.byteDesc(realCacheSize.sum()) + ", total=" + StringUtils.byteDesc(totalSize)); 767 } 768 769 long bytesToFreeWithExtra = (long) Math.floor(bytesToFreeWithoutExtra 770 * (1 + extraFreeFactor)); 771 772 // Instantiate priority buckets 773 BucketEntryGroup bucketSingle = new BucketEntryGroup(bytesToFreeWithExtra, 774 blockSize, getPartitionSize(singleFactor)); 775 BucketEntryGroup bucketMulti = new BucketEntryGroup(bytesToFreeWithExtra, 776 blockSize, getPartitionSize(multiFactor)); 777 BucketEntryGroup bucketMemory = new BucketEntryGroup(bytesToFreeWithExtra, 778 blockSize, getPartitionSize(memoryFactor)); 779 780 // Scan entire map putting bucket entry into appropriate bucket entry 781 // group 782 for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) { 783 switch (bucketEntryWithKey.getValue().getPriority()) { 784 case SINGLE: { 785 bucketSingle.add(bucketEntryWithKey); 786 break; 787 } 788 case MULTI: { 789 bucketMulti.add(bucketEntryWithKey); 790 break; 791 } 792 case MEMORY: { 793 bucketMemory.add(bucketEntryWithKey); 794 break; 795 } 796 } 797 } 798 799 PriorityQueue<BucketEntryGroup> bucketQueue = new PriorityQueue<>(3, 800 Comparator.comparingLong(BucketEntryGroup::overflow)); 801 802 bucketQueue.add(bucketSingle); 803 bucketQueue.add(bucketMulti); 804 bucketQueue.add(bucketMemory); 805 806 int remainingBuckets = bucketQueue.size(); 807 long bytesFreed = 0; 808 809 BucketEntryGroup bucketGroup; 810 while ((bucketGroup = bucketQueue.poll()) != null) { 811 long overflow = bucketGroup.overflow(); 812 if (overflow > 0) { 813 long bucketBytesToFree = Math.min(overflow, 814 (bytesToFreeWithoutExtra - bytesFreed) / remainingBuckets); 815 bytesFreed += bucketGroup.free(bucketBytesToFree); 816 } 817 remainingBuckets--; 818 } 819 820 // Check and free if there are buckets that still need freeing of space 821 if (bucketSizesAboveThresholdCount(minFactor) > 0) { 822 bucketQueue.clear(); 823 remainingBuckets = 3; 824 825 bucketQueue.add(bucketSingle); 826 bucketQueue.add(bucketMulti); 827 bucketQueue.add(bucketMemory); 828 829 while ((bucketGroup = bucketQueue.poll()) != null) { 830 long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets; 831 bytesFreed += bucketGroup.free(bucketBytesToFree); 832 remainingBuckets--; 833 } 834 } 835 836 // Even after the above free we might still need freeing because of the 837 // De-fragmentation of the buckets (also called Slab Calcification problem), i.e 838 // there might be some buckets where the occupancy is very sparse and thus are not 839 // yielding the free for the other bucket sizes, the fix for this to evict some 840 // of the buckets, we do this by evicting the buckets that are least fulled 841 freeEntireBuckets(DEFAULT_FREE_ENTIRE_BLOCK_FACTOR * 842 bucketSizesAboveThresholdCount(1.0f)); 843 844 if (LOG.isDebugEnabled()) { 845 long single = bucketSingle.totalSize(); 846 long multi = bucketMulti.totalSize(); 847 long memory = bucketMemory.totalSize(); 848 if (LOG.isDebugEnabled()) { 849 LOG.debug("Bucket cache free space completed; " + "freed=" 850 + StringUtils.byteDesc(bytesFreed) + ", " + "total=" 851 + StringUtils.byteDesc(totalSize) + ", " + "single=" 852 + StringUtils.byteDesc(single) + ", " + "multi=" 853 + StringUtils.byteDesc(multi) + ", " + "memory=" 854 + StringUtils.byteDesc(memory)); 855 } 856 } 857 858 } catch (Throwable t) { 859 LOG.warn("Failed freeing space", t); 860 } finally { 861 cacheStats.evict(); 862 freeInProgress = false; 863 freeSpaceLock.unlock(); 864 } 865 } 866 867 // This handles flushing the RAM cache to IOEngine. 868 @VisibleForTesting 869 class WriterThread extends Thread { 870 private final BlockingQueue<RAMQueueEntry> inputQueue; 871 private volatile boolean writerEnabled = true; 872 873 WriterThread(BlockingQueue<RAMQueueEntry> queue) { 874 super("BucketCacheWriterThread"); 875 this.inputQueue = queue; 876 } 877 878 // Used for test 879 @VisibleForTesting 880 void disableWriter() { 881 this.writerEnabled = false; 882 } 883 884 @Override 885 public void run() { 886 List<RAMQueueEntry> entries = new ArrayList<>(); 887 try { 888 while (cacheEnabled && writerEnabled) { 889 try { 890 try { 891 // Blocks 892 entries = getRAMQueueEntries(inputQueue, entries); 893 } catch (InterruptedException ie) { 894 if (!cacheEnabled || !writerEnabled) { 895 break; 896 } 897 } 898 doDrain(entries); 899 } catch (Exception ioe) { 900 LOG.error("WriterThread encountered error", ioe); 901 } 902 } 903 } catch (Throwable t) { 904 LOG.warn("Failed doing drain", t); 905 } 906 LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled); 907 } 908 909 /** 910 * Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing 911 * cache with a new block for the same cache key. there's a corner case: one thread cache a 912 * block in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another 913 * new block with the same cache key do the same thing for the same cache key, so if not evict 914 * the previous bucket entry, then memory leak happen because the previous bucketEntry is gone 915 * but the bucketAllocator do not free its memory. 916 * @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey 917 * cacheKey, Cacheable newBlock) 918 * @param key Block cache key 919 * @param bucketEntry Bucket entry to put into backingMap. 920 */ 921 private void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) { 922 BucketEntry previousEntry = backingMap.put(key, bucketEntry); 923 if (previousEntry != null && previousEntry != bucketEntry) { 924 previousEntry.withWriteLock(offsetLock, () -> { 925 blockEvicted(key, previousEntry, false); 926 return null; 927 }); 928 } 929 } 930 931 /** 932 * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap. 933 * Process all that are passed in even if failure being sure to remove from ramCache else we'll 934 * never undo the references and we'll OOME. 935 * @param entries Presumes list passed in here will be processed by this invocation only. No 936 * interference expected. 937 * @throws InterruptedException 938 */ 939 @VisibleForTesting 940 void doDrain(final List<RAMQueueEntry> entries) throws InterruptedException { 941 if (entries.isEmpty()) { 942 return; 943 } 944 // This method is a little hard to follow. We run through the passed in entries and for each 945 // successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must 946 // do cleanup making sure we've cleared ramCache of all entries regardless of whether we 947 // successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by 948 // filling ramCache. We do the clean up by again running through the passed in entries 949 // doing extra work when we find a non-null bucketEntries corresponding entry. 950 final int size = entries.size(); 951 BucketEntry[] bucketEntries = new BucketEntry[size]; 952 // Index updated inside loop if success or if we can't succeed. We retry if cache is full 953 // when we go to add an entry by going around the loop again without upping the index. 954 int index = 0; 955 while (cacheEnabled && index < size) { 956 RAMQueueEntry re = null; 957 try { 958 re = entries.get(index); 959 if (re == null) { 960 LOG.warn("Couldn't get entry or changed on us; who else is messing with it?"); 961 index++; 962 continue; 963 } 964 BucketEntry bucketEntry = re.writeToCache(ioEngine, bucketAllocator, realCacheSize); 965 // Successfully added. Up index and add bucketEntry. Clear io exceptions. 966 bucketEntries[index] = bucketEntry; 967 if (ioErrorStartTime > 0) { 968 ioErrorStartTime = -1; 969 } 970 index++; 971 } catch (BucketAllocatorException fle) { 972 LOG.warn("Failed allocation for " + (re == null ? "" : re.getKey()) + "; " + fle); 973 // Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below. 974 bucketEntries[index] = null; 975 index++; 976 } catch (CacheFullException cfe) { 977 // Cache full when we tried to add. Try freeing space and then retrying (don't up index) 978 if (!freeInProgress) { 979 freeSpace("Full!"); 980 } else { 981 Thread.sleep(50); 982 } 983 } catch (IOException ioex) { 984 // Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem. 985 LOG.error("Failed writing to bucket cache", ioex); 986 checkIOErrorIsTolerated(); 987 } 988 } 989 990 // Make sure data pages are written on media before we update maps. 991 try { 992 ioEngine.sync(); 993 } catch (IOException ioex) { 994 LOG.error("Failed syncing IO engine", ioex); 995 checkIOErrorIsTolerated(); 996 // Since we failed sync, free the blocks in bucket allocator 997 for (int i = 0; i < entries.size(); ++i) { 998 if (bucketEntries[i] != null) { 999 bucketAllocator.freeBlock(bucketEntries[i].offset()); 1000 bucketEntries[i] = null; 1001 } 1002 } 1003 } 1004 1005 // Now add to backingMap if successfully added to bucket cache. Remove from ramCache if 1006 // success or error. 1007 for (int i = 0; i < size; ++i) { 1008 BlockCacheKey key = entries.get(i).getKey(); 1009 // Only add if non-null entry. 1010 if (bucketEntries[i] != null) { 1011 putIntoBackingMap(key, bucketEntries[i]); 1012 } 1013 // Always remove from ramCache even if we failed adding it to the block cache above. 1014 boolean existed = ramCache.remove(key, re -> { 1015 if (re != null) { 1016 heapSize.add(-1 * re.getData().heapSize()); 1017 } 1018 }); 1019 if (!existed && bucketEntries[i] != null) { 1020 // Block should have already been evicted. Remove it and free space. 1021 final BucketEntry bucketEntry = bucketEntries[i]; 1022 bucketEntry.withWriteLock(offsetLock, () -> { 1023 if (backingMap.remove(key, bucketEntry)) { 1024 blockEvicted(key, bucketEntry, false); 1025 } 1026 return null; 1027 }); 1028 } 1029 } 1030 1031 long used = bucketAllocator.getUsedSize(); 1032 if (used > acceptableSize()) { 1033 freeSpace("Used=" + used + " > acceptable=" + acceptableSize()); 1034 } 1035 return; 1036 } 1037 } 1038 1039 /** 1040 * Blocks until elements available in {@code q} then tries to grab as many as possible before 1041 * returning. 1042 * @param receptacle Where to stash the elements taken from queue. We clear before we use it just 1043 * in case. 1044 * @param q The queue to take from. 1045 * @return {@code receptacle} laden with elements taken from the queue or empty if none found. 1046 */ 1047 @VisibleForTesting 1048 static List<RAMQueueEntry> getRAMQueueEntries(BlockingQueue<RAMQueueEntry> q, 1049 List<RAMQueueEntry> receptacle) throws InterruptedException { 1050 // Clear sets all entries to null and sets size to 0. We retain allocations. Presume it 1051 // ok even if list grew to accommodate thousands. 1052 receptacle.clear(); 1053 receptacle.add(q.take()); 1054 q.drainTo(receptacle); 1055 return receptacle; 1056 } 1057 1058 /** 1059 * @see #retrieveFromFile(int[]) 1060 */ 1061 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="OBL_UNSATISFIED_OBLIGATION", 1062 justification = "false positive, try-with-resources ensures close is called.") 1063 private void persistToFile() throws IOException { 1064 assert !cacheEnabled; 1065 if (!ioEngine.isPersistent()) { 1066 throw new IOException("Attempt to persist non-persistent cache mappings!"); 1067 } 1068 try (FileOutputStream fos = new FileOutputStream(persistencePath, false)) { 1069 fos.write(ProtobufMagic.PB_MAGIC); 1070 BucketProtoUtils.toPB(this).writeDelimitedTo(fos); 1071 } 1072 } 1073 1074 /** 1075 * @see #persistToFile() 1076 */ 1077 private void retrieveFromFile(int[] bucketSizes) throws IOException { 1078 File persistenceFile = new File(persistencePath); 1079 if (!persistenceFile.exists()) { 1080 return; 1081 } 1082 assert !cacheEnabled; 1083 1084 try (FileInputStream in = deleteFileOnClose(persistenceFile)) { 1085 int pblen = ProtobufMagic.lengthOfPBMagic(); 1086 byte[] pbuf = new byte[pblen]; 1087 int read = in.read(pbuf); 1088 if (read != pblen) { 1089 throw new IOException("Incorrect number of bytes read while checking for protobuf magic " 1090 + "number. Requested=" + pblen + ", Received= " + read + ", File=" + persistencePath); 1091 } 1092 if (! ProtobufMagic.isPBMagicPrefix(pbuf)) { 1093 // In 3.0 we have enough flexibility to dump the old cache data. 1094 // TODO: In 2.x line, this might need to be filled in to support reading the old format 1095 throw new IOException("Persistence file does not start with protobuf magic number. " + 1096 persistencePath); 1097 } 1098 parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in)); 1099 bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize); 1100 blockNumber.add(backingMap.size()); 1101 } 1102 } 1103 1104 /** 1105 * Create an input stream that deletes the file after reading it. Use in try-with-resources to 1106 * avoid this pattern where an exception thrown from a finally block may mask earlier exceptions: 1107 * <pre> 1108 * File f = ... 1109 * try (FileInputStream fis = new FileInputStream(f)) { 1110 * // use the input stream 1111 * } finally { 1112 * if (!f.delete()) throw new IOException("failed to delete"); 1113 * } 1114 * </pre> 1115 * @param file the file to read and delete 1116 * @return a FileInputStream for the given file 1117 * @throws IOException if there is a problem creating the stream 1118 */ 1119 private FileInputStream deleteFileOnClose(final File file) throws IOException { 1120 return new FileInputStream(file) { 1121 @Override 1122 public void close() throws IOException { 1123 super.close(); 1124 if (!file.delete()) { 1125 throw new IOException("Failed deleting persistence file " + file.getAbsolutePath()); 1126 } 1127 } 1128 }; 1129 } 1130 1131 private void verifyCapacityAndClasses(long capacitySize, String ioclass, String mapclass) 1132 throws IOException { 1133 if (capacitySize != cacheCapacity) { 1134 throw new IOException("Mismatched cache capacity:" 1135 + StringUtils.byteDesc(capacitySize) + ", expected: " 1136 + StringUtils.byteDesc(cacheCapacity)); 1137 } 1138 if (!ioEngine.getClass().getName().equals(ioclass)) { 1139 throw new IOException("Class name for IO engine mismatch: " + ioclass 1140 + ", expected:" + ioEngine.getClass().getName()); 1141 } 1142 if (!backingMap.getClass().getName().equals(mapclass)) { 1143 throw new IOException("Class name for cache map mismatch: " + mapclass 1144 + ", expected:" + backingMap.getClass().getName()); 1145 } 1146 } 1147 1148 private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOException { 1149 if (proto.hasChecksum()) { 1150 ((PersistentIOEngine) ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(), 1151 algorithm); 1152 } else { 1153 // if has not checksum, it means the persistence file is old format 1154 LOG.info("Persistent file is old format, it does not support verifying file integrity!"); 1155 } 1156 verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass()); 1157 backingMap = BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap()); 1158 } 1159 1160 /** 1161 * Check whether we tolerate IO error this time. If the duration of IOEngine 1162 * throwing errors exceeds ioErrorsDurationTimeTolerated, we will disable the 1163 * cache 1164 */ 1165 private void checkIOErrorIsTolerated() { 1166 long now = EnvironmentEdgeManager.currentTime(); 1167 // Do a single read to a local variable to avoid timing issue - HBASE-24454 1168 long ioErrorStartTimeTmp = this.ioErrorStartTime; 1169 if (ioErrorStartTimeTmp > 0) { 1170 if (cacheEnabled && (now - ioErrorStartTimeTmp) > this.ioErrorsTolerationDuration) { 1171 LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration + 1172 "ms, disabling cache, please check your IOEngine"); 1173 disableCache(); 1174 } 1175 } else { 1176 this.ioErrorStartTime = now; 1177 } 1178 } 1179 1180 /** 1181 * Used to shut down the cache -or- turn it off in the case of something broken. 1182 */ 1183 private void disableCache() { 1184 if (!cacheEnabled) return; 1185 cacheEnabled = false; 1186 ioEngine.shutdown(); 1187 this.scheduleThreadPool.shutdown(); 1188 for (int i = 0; i < writerThreads.length; ++i) writerThreads[i].interrupt(); 1189 this.ramCache.clear(); 1190 if (!ioEngine.isPersistent() || persistencePath == null) { 1191 // If persistent ioengine and a path, we will serialize out the backingMap. 1192 this.backingMap.clear(); 1193 } 1194 } 1195 1196 private void join() throws InterruptedException { 1197 for (int i = 0; i < writerThreads.length; ++i) 1198 writerThreads[i].join(); 1199 } 1200 1201 @Override 1202 public void shutdown() { 1203 disableCache(); 1204 LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent() 1205 + "; path to write=" + persistencePath); 1206 if (ioEngine.isPersistent() && persistencePath != null) { 1207 try { 1208 join(); 1209 persistToFile(); 1210 } catch (IOException ex) { 1211 LOG.error("Unable to persist data on exit: " + ex.toString(), ex); 1212 } catch (InterruptedException e) { 1213 LOG.warn("Failed to persist data on exit", e); 1214 } 1215 } 1216 } 1217 1218 @Override 1219 public CacheStats getStats() { 1220 return cacheStats; 1221 } 1222 1223 public BucketAllocator getAllocator() { 1224 return this.bucketAllocator; 1225 } 1226 1227 @Override 1228 public long heapSize() { 1229 return this.heapSize.sum(); 1230 } 1231 1232 @Override 1233 public long size() { 1234 return this.realCacheSize.sum(); 1235 } 1236 1237 @Override 1238 public long getCurrentDataSize() { 1239 return size(); 1240 } 1241 1242 @Override 1243 public long getFreeSize() { 1244 return this.bucketAllocator.getFreeSize(); 1245 } 1246 1247 @Override 1248 public long getBlockCount() { 1249 return this.blockNumber.sum(); 1250 } 1251 1252 @Override 1253 public long getDataBlockCount() { 1254 return getBlockCount(); 1255 } 1256 1257 @Override 1258 public long getCurrentSize() { 1259 return this.bucketAllocator.getUsedSize(); 1260 } 1261 1262 protected String getAlgorithm() { 1263 return algorithm; 1264 } 1265 1266 /** 1267 * Evicts all blocks for a specific HFile. 1268 * <p> 1269 * This is used for evict-on-close to remove all blocks of a specific HFile. 1270 * 1271 * @return the number of blocks evicted 1272 */ 1273 @Override 1274 public int evictBlocksByHfileName(String hfileName) { 1275 Set<BlockCacheKey> keySet = blocksByHFile.subSet( 1276 new BlockCacheKey(hfileName, Long.MIN_VALUE), true, 1277 new BlockCacheKey(hfileName, Long.MAX_VALUE), true); 1278 1279 int numEvicted = 0; 1280 for (BlockCacheKey key : keySet) { 1281 if (evictBlock(key)) { 1282 ++numEvicted; 1283 } 1284 } 1285 1286 return numEvicted; 1287 } 1288 1289 /** 1290 * Used to group bucket entries into priority buckets. There will be a 1291 * BucketEntryGroup for each priority (single, multi, memory). Once bucketed, 1292 * the eviction algorithm takes the appropriate number of elements out of each 1293 * according to configuration parameters and their relative sizes. 1294 */ 1295 private class BucketEntryGroup { 1296 1297 private CachedEntryQueue queue; 1298 private long totalSize = 0; 1299 private long bucketSize; 1300 1301 public BucketEntryGroup(long bytesToFree, long blockSize, long bucketSize) { 1302 this.bucketSize = bucketSize; 1303 queue = new CachedEntryQueue(bytesToFree, blockSize); 1304 totalSize = 0; 1305 } 1306 1307 public void add(Map.Entry<BlockCacheKey, BucketEntry> block) { 1308 totalSize += block.getValue().getLength(); 1309 queue.add(block); 1310 } 1311 1312 public long free(long toFree) { 1313 Map.Entry<BlockCacheKey, BucketEntry> entry; 1314 long freedBytes = 0; 1315 // TODO avoid a cycling siutation. We find no block which is not in use and so no way to free 1316 // What to do then? Caching attempt fail? Need some changes in cacheBlock API? 1317 while ((entry = queue.pollLast()) != null) { 1318 BucketEntry be = entry.getValue(); 1319 if (be.withWriteLock(offsetLock, be::markStaleAsEvicted)) { 1320 freedBytes += be.getLength(); 1321 } 1322 if (freedBytes >= toFree) { 1323 return freedBytes; 1324 } 1325 } 1326 return freedBytes; 1327 } 1328 1329 public long overflow() { 1330 return totalSize - bucketSize; 1331 } 1332 1333 public long totalSize() { 1334 return totalSize; 1335 } 1336 } 1337 1338 /** 1339 * Block Entry stored in the memory with key,data and so on 1340 */ 1341 @VisibleForTesting 1342 static class RAMQueueEntry { 1343 private final BlockCacheKey key; 1344 private final Cacheable data; 1345 private long accessCounter; 1346 private boolean inMemory; 1347 private final Recycler recycler; 1348 1349 RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, boolean inMemory, 1350 Recycler recycler) { 1351 this.key = bck; 1352 this.data = data; 1353 this.accessCounter = accessCounter; 1354 this.inMemory = inMemory; 1355 this.recycler = recycler; 1356 } 1357 1358 public Cacheable getData() { 1359 return data; 1360 } 1361 1362 public BlockCacheKey getKey() { 1363 return key; 1364 } 1365 1366 public void access(long accessCounter) { 1367 this.accessCounter = accessCounter; 1368 } 1369 1370 private ByteBuffAllocator getByteBuffAllocator() { 1371 if (data instanceof HFileBlock) { 1372 return ((HFileBlock) data).getByteBuffAllocator(); 1373 } 1374 return ByteBuffAllocator.HEAP; 1375 } 1376 1377 public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator alloc, 1378 final LongAdder realCacheSize) throws IOException { 1379 int len = data.getSerializedLength(); 1380 // This cacheable thing can't be serialized 1381 if (len == 0) { 1382 return null; 1383 } 1384 long offset = alloc.allocateBlock(len); 1385 boolean succ = false; 1386 BucketEntry bucketEntry = null; 1387 try { 1388 bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory, RefCnt.create(recycler), 1389 getByteBuffAllocator()); 1390 bucketEntry.setDeserializerReference(data.getDeserializer()); 1391 if (data instanceof HFileBlock) { 1392 // If an instance of HFileBlock, save on some allocations. 1393 HFileBlock block = (HFileBlock) data; 1394 ByteBuff sliceBuf = block.getBufferReadOnly(); 1395 ByteBuffer metadata = block.getMetaData(); 1396 ioEngine.write(sliceBuf, offset); 1397 ioEngine.write(metadata, offset + len - metadata.limit()); 1398 } else { 1399 // Only used for testing. 1400 ByteBuffer bb = ByteBuffer.allocate(len); 1401 data.serialize(bb, true); 1402 ioEngine.write(bb, offset); 1403 } 1404 succ = true; 1405 } finally { 1406 if (!succ) { 1407 alloc.freeBlock(offset); 1408 } 1409 } 1410 realCacheSize.add(len); 1411 return bucketEntry; 1412 } 1413 } 1414 1415 /** 1416 * Only used in test 1417 * @throws InterruptedException 1418 */ 1419 void stopWriterThreads() throws InterruptedException { 1420 for (WriterThread writerThread : writerThreads) { 1421 writerThread.disableWriter(); 1422 writerThread.interrupt(); 1423 writerThread.join(); 1424 } 1425 } 1426 1427 @Override 1428 public Iterator<CachedBlock> iterator() { 1429 // Don't bother with ramcache since stuff is in here only a little while. 1430 final Iterator<Map.Entry<BlockCacheKey, BucketEntry>> i = 1431 this.backingMap.entrySet().iterator(); 1432 return new Iterator<CachedBlock>() { 1433 private final long now = System.nanoTime(); 1434 1435 @Override 1436 public boolean hasNext() { 1437 return i.hasNext(); 1438 } 1439 1440 @Override 1441 public CachedBlock next() { 1442 final Map.Entry<BlockCacheKey, BucketEntry> e = i.next(); 1443 return new CachedBlock() { 1444 @Override 1445 public String toString() { 1446 return BlockCacheUtil.toString(this, now); 1447 } 1448 1449 @Override 1450 public BlockPriority getBlockPriority() { 1451 return e.getValue().getPriority(); 1452 } 1453 1454 @Override 1455 public BlockType getBlockType() { 1456 // Not held by BucketEntry. Could add it if wanted on BucketEntry creation. 1457 return null; 1458 } 1459 1460 @Override 1461 public long getOffset() { 1462 return e.getKey().getOffset(); 1463 } 1464 1465 @Override 1466 public long getSize() { 1467 return e.getValue().getLength(); 1468 } 1469 1470 @Override 1471 public long getCachedTime() { 1472 return e.getValue().getCachedTime(); 1473 } 1474 1475 @Override 1476 public String getFilename() { 1477 return e.getKey().getHfileName(); 1478 } 1479 1480 @Override 1481 public int compareTo(CachedBlock other) { 1482 int diff = this.getFilename().compareTo(other.getFilename()); 1483 if (diff != 0) return diff; 1484 1485 diff = Long.compare(this.getOffset(), other.getOffset()); 1486 if (diff != 0) return diff; 1487 if (other.getCachedTime() < 0 || this.getCachedTime() < 0) { 1488 throw new IllegalStateException("" + this.getCachedTime() + ", " + 1489 other.getCachedTime()); 1490 } 1491 return Long.compare(other.getCachedTime(), this.getCachedTime()); 1492 } 1493 1494 @Override 1495 public int hashCode() { 1496 return e.getKey().hashCode(); 1497 } 1498 1499 @Override 1500 public boolean equals(Object obj) { 1501 if (obj instanceof CachedBlock) { 1502 CachedBlock cb = (CachedBlock)obj; 1503 return compareTo(cb) == 0; 1504 } else { 1505 return false; 1506 } 1507 } 1508 }; 1509 } 1510 1511 @Override 1512 public void remove() { 1513 throw new UnsupportedOperationException(); 1514 } 1515 }; 1516 } 1517 1518 @Override 1519 public BlockCache[] getBlockCaches() { 1520 return null; 1521 } 1522 1523 @VisibleForTesting 1524 public int getRpcRefCount(BlockCacheKey cacheKey) { 1525 BucketEntry bucketEntry = backingMap.get(cacheKey); 1526 if (bucketEntry != null) { 1527 return bucketEntry.refCnt() - (bucketEntry.markedAsEvicted.get() ? 0 : 1); 1528 } 1529 return 0; 1530 } 1531 1532 float getAcceptableFactor() { 1533 return acceptableFactor; 1534 } 1535 1536 float getMinFactor() { 1537 return minFactor; 1538 } 1539 1540 float getExtraFreeFactor() { 1541 return extraFreeFactor; 1542 } 1543 1544 float getSingleFactor() { 1545 return singleFactor; 1546 } 1547 1548 float getMultiFactor() { 1549 return multiFactor; 1550 } 1551 1552 float getMemoryFactor() { 1553 return memoryFactor; 1554 } 1555 1556 /** 1557 * Wrapped the delegate ConcurrentMap with maintaining its block's reference count. 1558 */ 1559 static class RAMCache { 1560 /** 1561 * Defined the map as {@link ConcurrentHashMap} explicitly here, because in 1562 * {@link RAMCache#get(BlockCacheKey)} and 1563 * {@link RAMCache#putIfAbsent(BlockCacheKey, BucketCache.RAMQueueEntry)} , we need to 1564 * guarantee the atomicity of map#computeIfPresent(key, func) and map#putIfAbsent(key, func). 1565 * Besides, the func method can execute exactly once only when the key is present(or absent) 1566 * and under the lock context. Otherwise, the reference count of block will be messed up. 1567 * Notice that the {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that. 1568 */ 1569 final ConcurrentHashMap<BlockCacheKey, RAMQueueEntry> delegate = new ConcurrentHashMap<>(); 1570 1571 public boolean containsKey(BlockCacheKey key) { 1572 return delegate.containsKey(key); 1573 } 1574 1575 public RAMQueueEntry get(BlockCacheKey key) { 1576 return delegate.computeIfPresent(key, (k, re) -> { 1577 // It'll be referenced by RPC, so retain atomically here. if the get and retain is not 1578 // atomic, another thread may remove and release the block, when retaining in this thread we 1579 // may retain a block with refCnt=0 which is disallowed. (see HBASE-22422) 1580 re.getData().retain(); 1581 return re; 1582 }); 1583 } 1584 1585 /** 1586 * Return the previous associated value, or null if absent. It has the same meaning as 1587 * {@link ConcurrentMap#putIfAbsent(Object, Object)} 1588 */ 1589 public RAMQueueEntry putIfAbsent(BlockCacheKey key, RAMQueueEntry entry) { 1590 AtomicBoolean absent = new AtomicBoolean(false); 1591 RAMQueueEntry re = delegate.computeIfAbsent(key, k -> { 1592 // The RAMCache reference to this entry, so reference count should be increment. 1593 entry.getData().retain(); 1594 absent.set(true); 1595 return entry; 1596 }); 1597 return absent.get() ? null : re; 1598 } 1599 1600 public boolean remove(BlockCacheKey key) { 1601 return remove(key, re->{}); 1602 } 1603 1604 /** 1605 * Defined an {@link Consumer} here, because once the removed entry release its reference count, 1606 * then it's ByteBuffers may be recycled and accessing it outside this method will be thrown an 1607 * exception. the consumer will access entry to remove before release its reference count. 1608 * Notice, don't change its reference count in the {@link Consumer} 1609 */ 1610 public boolean remove(BlockCacheKey key, Consumer<RAMQueueEntry> action) { 1611 RAMQueueEntry previous = delegate.remove(key); 1612 action.accept(previous); 1613 if (previous != null) { 1614 previous.getData().release(); 1615 } 1616 return previous != null; 1617 } 1618 1619 public boolean isEmpty() { 1620 return delegate.isEmpty(); 1621 } 1622 1623 public void clear() { 1624 Iterator<Map.Entry<BlockCacheKey, RAMQueueEntry>> it = delegate.entrySet().iterator(); 1625 while (it.hasNext()) { 1626 RAMQueueEntry re = it.next().getValue(); 1627 it.remove(); 1628 re.getData().release(); 1629 } 1630 } 1631 } 1632}