@InterfaceAudience.Private public class BucketCache extends Object implements BlockCache, HeapSize
BucketAllocator to allocate/free blocks, and uses BucketCache#ramCache
and BucketCache#backingMap in order to determine if a given element is in the cache. The bucket
cache can use off-heap memory ByteBufferIOEngine or mmap
ExclusiveMemoryMmapIOEngine or pmem SharedMemoryMmapIOEngine or local files
FileIOEngine to store/read the block data.
Eviction is via a similar algorithm as used in
LruBlockCache
BucketCache can be used as mainly a block cache (see
CombinedBlockCache), combined with a BlockCache to
decrease CMS GC and heap fragmentation.
It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store blocks) to enlarge cache space via a victim cache.
| Modifier and Type | Class and Description |
|---|---|
private class |
BucketCache.BucketEntryGroup
Used to group bucket entries into priority buckets.
|
(package private) static class |
BucketCache.RAMCache
Wrapped the delegate ConcurrentMap with maintaining its block's reference count.
|
(package private) static class |
BucketCache.RAMQueueEntry
Block Entry stored in the memory with key,data and so on
|
private static class |
BucketCache.StatisticsThread |
(package private) class |
BucketCache.WriterThread |
| Modifier and Type | Field and Description |
|---|---|
(package private) static String |
ACCEPT_FACTOR_CONFIG_NAME |
private float |
acceptableFactor
Acceptable size of cache (no evictions if size < acceptable)
|
private AtomicLong |
accessCount
Cache access count (sequential ID)
|
private String |
algorithm
Use
MessageDigest class's encryption algorithms to check persistent file
integrity, default algorithm is MD5 |
private static int |
ALLOCATION_FAIL_LOG_TIME_PERIOD |
private long |
allocFailLogPrevTs |
(package private) ConcurrentHashMap<BlockCacheKey,BucketEntry> |
backingMap |
private LongAdder |
blockNumber
Current number of cached elements
|
private NavigableSet<BlockCacheKey> |
blocksByHFile |
private long |
blockSize
Approximate block size
|
private BucketAllocator |
bucketAllocator |
private long |
cacheCapacity |
private boolean |
cacheEnabled
Flag if the cache is enabled or not...
|
private BucketCacheStats |
cacheStats |
private static float |
DEFAULT_ACCEPT_FACTOR |
private static int |
DEFAULT_CACHE_WAIT_TIME |
static int |
DEFAULT_ERROR_TOLERATION_DURATION |
private static float |
DEFAULT_EXTRA_FREE_FACTOR |
private static String |
DEFAULT_FILE_VERIFY_ALGORITHM |
private static int |
DEFAULT_FREE_ENTIRE_BLOCK_FACTOR |
(package private) static float |
DEFAULT_MEMORY_FACTOR |
(package private) static float |
DEFAULT_MIN_FACTOR |
(package private) static float |
DEFAULT_MULTI_FACTOR |
(package private) static float |
DEFAULT_SINGLE_FACTOR
Priority buckets
|
(package private) static int |
DEFAULT_WRITER_QUEUE_ITEMS |
(package private) static int |
DEFAULT_WRITER_THREADS |
(package private) static String |
EXTRA_FREE_FACTOR_CONFIG_NAME |
private float |
extraFreeFactor
Free this floating point factor of extra blocks when evicting.
|
private static String |
FILE_VERIFY_ALGORITHM |
private boolean |
freeInProgress
Volatile boolean to track if free space is in process or not
|
private Lock |
freeSpaceLock |
private LongAdder |
heapSize |
(package private) IOEngine |
ioEngine |
private long |
ioErrorStartTime |
private int |
ioErrorsTolerationDuration
Duration of IO errors tolerated before we disable cache, 1 min as default
|
private static org.slf4j.Logger |
LOG |
(package private) static String |
MEMORY_FACTOR_CONFIG_NAME |
private float |
memoryFactor
In-memory bucket size
|
(package private) static String |
MIN_FACTOR_CONFIG_NAME |
private float |
minFactor
Minimum threshold of cache (when evicting, evict until size < min)
|
(package private) static String |
MULTI_FACTOR_CONFIG_NAME |
private float |
multiFactor
Multiple access bucket size
|
(package private) IdReadWriteLock<Long> |
offsetLock
A ReentrantReadWriteLock to lock on a particular block identified by offset.
|
private String |
persistencePath |
(package private) BucketCache.RAMCache |
ramCache |
private LongAdder |
realCacheSize |
private ScheduledExecutorService |
scheduleThreadPool
Statistics thread schedule pool (for heavy debugging, could remove)
|
(package private) static String |
SINGLE_FACTOR_CONFIG_NAME
Priority buckets config
|
private float |
singleFactor
Single access bucket size
|
private static int |
statThreadPeriod
Statistics thread
|
(package private) boolean |
wait_when_cache
Used in tests.
|
(package private) ArrayList<BlockingQueue<BucketCache.RAMQueueEntry>> |
writerQueues
A list of writer queues.
|
(package private) BucketCache.WriterThread[] |
writerThreads |
| Constructor and Description |
|---|
BucketCache(String ioEngineName,
long capacity,
int blockSize,
int[] bucketSizes,
int writerThreadNum,
int writerQLen,
String persistencePath) |
BucketCache(String ioEngineName,
long capacity,
int blockSize,
int[] bucketSizes,
int writerThreadNum,
int writerQLen,
String persistencePath,
int ioErrorsTolerationDuration,
org.apache.hadoop.conf.Configuration conf) |
| Modifier and Type | Method and Description |
|---|---|
long |
acceptableSize() |
(package private) void |
blockEvicted(BlockCacheKey cacheKey,
BucketEntry bucketEntry,
boolean decrementBlockNumber,
boolean evictedByEvictionProcess)
This method is invoked after the bucketEntry is removed from
backingMap |
private int |
bucketSizesAboveThresholdCount(float minFactor)
Return the count of bucketSizeinfos still need free space
|
void |
cacheBlock(BlockCacheKey cacheKey,
Cacheable buf)
Cache the block with the specified name and buffer.
|
void |
cacheBlock(BlockCacheKey cacheKey,
Cacheable cachedItem,
boolean inMemory)
Cache the block with the specified name and buffer.
|
void |
cacheBlockWithWait(BlockCacheKey cacheKey,
Cacheable cachedItem,
boolean inMemory,
boolean wait)
Cache the block to ramCache
|
protected void |
cacheBlockWithWaitInternal(BlockCacheKey cacheKey,
Cacheable cachedItem,
boolean inMemory,
boolean wait) |
private void |
checkIOErrorIsTolerated()
Check whether we tolerate IO error this time.
|
private ByteBuffAllocator.Recycler |
createRecycler(BucketEntry bucketEntry)
Create the
ByteBuffAllocator.Recycler for BucketEntry.refCnt,which would be used as
RefCnt.recycler of HFileBlock.buf returned from getBlock(org.apache.hadoop.hbase.io.hfile.BlockCacheKey, boolean, boolean, boolean). |
private FileInputStream |
deleteFileOnClose(File file)
Create an input stream that deletes the file after reading it.
|
private void |
disableCache()
Used to shut down the cache -or- turn it off in the case of something broken.
|
(package private) void |
doDrain(List<BucketCache.RAMQueueEntry> entries,
ByteBuffer metaBuff)
Flush the entries in ramCache to IOEngine and add bucket entry to backingMap.
|
private boolean |
doEvictBlock(BlockCacheKey cacheKey,
BucketEntry bucketEntry,
boolean evictedByEvictionProcess)
|
boolean |
evictBlock(BlockCacheKey cacheKey)
Try to evict the block from
BlockCache by force. |
boolean |
evictBlockIfNoRpcReferenced(BlockCacheKey blockCacheKey)
NOTE: This method is only for test.
|
int |
evictBlocksByHfileName(String hfileName)
Evicts all blocks for a specific HFile.
|
(package private) boolean |
evictBucketEntryIfNoRpcReferenced(BlockCacheKey blockCacheKey,
BucketEntry bucketEntry)
|
(package private) void |
freeBucketEntry(BucketEntry bucketEntry)
Free the {
BucketEntry actually,which could only be invoked when the
BucketEntry.refCnt becoming 0. |
private void |
freeEntireBuckets(int completelyFreeBucketsNeeded)
This method will find the buckets that are minimally occupied and are not reference counted and
will free them completely without any constraint on the access times of the elements, and as a
process will completely free at most the number of buckets passed, sometimes it might not due
to changing refCounts
|
(package private) void |
freeSpace(String why)
Free the space if the used size reaches acceptableSize() or one size block couldn't be
allocated.
|
(package private) float |
getAcceptableFactor() |
protected String |
getAlgorithm() |
private static String |
getAllocationFailWarningMessage(BucketAllocatorException fle,
BucketCache.RAMQueueEntry re)
Prepare and return a warning message for Bucket Allocator Exception
|
BucketAllocator |
getAllocator() |
Cacheable |
getBlock(BlockCacheKey key,
boolean caching,
boolean repeat,
boolean updateCacheMetrics)
Get the buffer of the block with the specified key.
|
BlockCache[] |
getBlockCaches()
Returns The list of sub blockcaches that make up this one; returns null if no sub caches.
|
long |
getBlockCount()
Returns the number of blocks currently cached in the block cache.
|
long |
getCurrentDataSize()
Returns the occupied size of data blocks, in bytes.
|
long |
getCurrentSize()
Returns the occupied size of the block cache, in bytes.
|
long |
getDataBlockCount()
Returns the number of data blocks currently cached in the block cache.
|
(package private) float |
getExtraFreeFactor() |
long |
getFreeSize()
Returns the free size of the block cache, in bytes.
|
String |
getIoEngine() |
private IOEngine |
getIOEngineFromName(String ioEngineName,
long capacity,
String persistencePath)
Get the IOEngine from the IO engine name nnn * @return the IOEngine n
|
long |
getMaxSize()
Returns the Max size of the block cache, in bytes.
|
(package private) float |
getMemoryFactor() |
(package private) float |
getMinFactor() |
(package private) float |
getMultiFactor() |
(package private) long |
getPartitionSize(float partitionFactor) |
(package private) static List<BucketCache.RAMQueueEntry> |
getRAMQueueEntries(BlockingQueue<BucketCache.RAMQueueEntry> q,
List<BucketCache.RAMQueueEntry> receptacle)
Blocks until elements available in
q then tries to grab as many as possible before
returning. |
long |
getRealCacheSize() |
int |
getRpcRefCount(BlockCacheKey cacheKey) |
(package private) float |
getSingleFactor() |
CacheStats |
getStats()
Get the statistics for this block cache.
|
long |
heapSize()
Return the approximate 'exclusive deep size' of implementing object.
|
(package private) boolean |
isCacheEnabled() |
Iterator<CachedBlock> |
iterator()
Returns Iterator over the blocks in the cache.
|
private void |
join() |
void |
logStats() |
private void |
parsePB(org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos.BucketCacheEntry proto) |
private void |
persistToFile() |
protected void |
putIntoBackingMap(BlockCacheKey key,
BucketEntry bucketEntry)
Put the new bucket entry into backingMap.
|
protected boolean |
removeFromRamCache(BlockCacheKey cacheKey) |
private void |
retrieveFromFile(int[] bucketSizes) |
private void |
sanityCheckConfigs() |
protected boolean |
shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey,
Cacheable newBlock) |
void |
shutdown()
Shutdown the cache.
|
long |
size()
Returns the total size of the block cache, in bytes.
|
protected void |
startWriterThreads()
Called by the constructor to start the writer threads.
|
(package private) void |
stopWriterThreads()
Only used in test n
|
private void |
verifyCapacityAndClasses(long capacitySize,
String ioclass,
String mapclass) |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitisMetaBlockforEach, spliteratorprivate static final org.slf4j.Logger LOG
static final String SINGLE_FACTOR_CONFIG_NAME
static final String MULTI_FACTOR_CONFIG_NAME
static final String MEMORY_FACTOR_CONFIG_NAME
static final String EXTRA_FREE_FACTOR_CONFIG_NAME
static final String ACCEPT_FACTOR_CONFIG_NAME
static final String MIN_FACTOR_CONFIG_NAME
static final float DEFAULT_SINGLE_FACTOR
static final float DEFAULT_MULTI_FACTOR
static final float DEFAULT_MEMORY_FACTOR
static final float DEFAULT_MIN_FACTOR
private static final float DEFAULT_EXTRA_FREE_FACTOR
private static final float DEFAULT_ACCEPT_FACTOR
private static final int DEFAULT_FREE_ENTIRE_BLOCK_FACTOR
private static final int statThreadPeriod
static final int DEFAULT_WRITER_THREADS
static final int DEFAULT_WRITER_QUEUE_ITEMS
final transient BucketCache.RAMCache ramCache
transient ConcurrentHashMap<BlockCacheKey,BucketEntry> backingMap
private volatile boolean cacheEnabled
final transient ArrayList<BlockingQueue<BucketCache.RAMQueueEntry>> writerQueues
BucketCache.WriterThread we have running. In other
words, the work adding blocks to the BucketCache is divided up amongst the running
WriterThreads. Its done by taking hash of the cache key modulo queue count. WriterThread when
it runs takes whatever has been recently added and 'drains' the entries to the BucketCache. It
then updates the ramCache and backingMap accordingly.final transient BucketCache.WriterThread[] writerThreads
private volatile boolean freeInProgress
private final transient Lock freeSpaceLock
private final LongAdder realCacheSize
private final LongAdder blockNumber
private final AtomicLong accessCount
private static final int DEFAULT_CACHE_WAIT_TIME
boolean wait_when_cache
private final BucketCacheStats cacheStats
private final String persistencePath
private final long cacheCapacity
private final long blockSize
private final int ioErrorsTolerationDuration
public static final int DEFAULT_ERROR_TOLERATION_DURATION
private volatile long ioErrorStartTime
final transient IdReadWriteLock<Long> offsetLock
Key set of offsets in BucketCache is limited so soft reference is the best choice here.
private final NavigableSet<BlockCacheKey> blocksByHFile
private final transient ScheduledExecutorService scheduleThreadPool
private transient BucketAllocator bucketAllocator
private float acceptableFactor
private float minFactor
private float extraFreeFactor
private float singleFactor
private float multiFactor
private float memoryFactor
private static final String FILE_VERIFY_ALGORITHM
private static final String DEFAULT_FILE_VERIFY_ALGORITHM
private String algorithm
MessageDigest class's encryption algorithms to check persistent file
integrity, default algorithm is MD5private long allocFailLogPrevTs
private static final int ALLOCATION_FAIL_LOG_TIME_PERIOD
public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, int writerThreadNum, int writerQLen, String persistencePath) throws IOException
IOExceptionpublic BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration, org.apache.hadoop.conf.Configuration conf) throws IOException
IOExceptionprivate void sanityCheckConfigs()
protected void startWriterThreads()
boolean isCacheEnabled()
public long getMaxSize()
BlockCachegetMaxSize in interface BlockCachepublic String getIoEngine()
private IOEngine getIOEngineFromName(String ioEngineName, long capacity, String persistencePath) throws IOException
IOExceptionpublic void cacheBlock(BlockCacheKey cacheKey, Cacheable buf)
cacheBlock in interface BlockCachecacheKey - block's cache keybuf - block bufferpublic void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory)
cacheBlock in interface BlockCachecacheKey - block's cache keycachedItem - block bufferinMemory - if block is in-memorypublic void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, boolean wait)
cacheKey - block's cache keycachedItem - block bufferinMemory - if block is in-memorywait - if true, blocking wait when queue is fullprotected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock)
protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, boolean wait)
public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, boolean updateCacheMetrics)
getBlock in interface BlockCachekey - block's cache keycaching - true if the caller caches blocks on cache missesrepeat - Whether this is a repeat lookup for the same blockupdateCacheMetrics - Whether we should update cache metrics or notvoid blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber, boolean evictedByEvictionProcess)
backingMapvoid freeBucketEntry(BucketEntry bucketEntry)
BucketEntry actually,which could only be invoked when the
BucketEntry.refCnt becoming 0.public boolean evictBlock(BlockCacheKey cacheKey)
BlockCache by force. We'll call this in few cases:Admin.clearBlockCache(TableName) to clear all blocks for a given table.
Firstly, we'll try to remove the block from RAMCache,and then try to evict from backingMap.
Here we evict the block from backingMap immediately, but only free the reference from bucket
cache by calling BucketEntry.markedAsEvicted. If there're still some RPC referring this
block, block can only be de-allocated when all of them release the block.
NOTICE: we need to grab the write offset lock firstly before releasing the reference from
bucket cache. if we don't, we may read an BucketEntry with refCnt = 0 when
getBlock(BlockCacheKey, boolean, boolean, boolean), it's a memory leak.
evictBlock in interface BlockCachecacheKey - Block to evictprivate boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean evictedByEvictionProcess)
BlockCacheKey and BucketEntry from backingMap and
ramCache. backingMap,only the matched BlockCacheKey and
BucketEntry could be removed.cacheKey - BlockCacheKey to evict.bucketEntry - BucketEntry matched BlockCacheKey to evict.private ByteBuffAllocator.Recycler createRecycler(BucketEntry bucketEntry)
Create theByteBuffAllocator.RecyclerforBucketEntry.refCnt,which would be used asRefCnt.recyclerofHFileBlock.bufreturned fromgetBlock(org.apache.hadoop.hbase.io.hfile.BlockCacheKey, boolean, boolean, boolean). NOTE: forgetBlock(org.apache.hadoop.hbase.io.hfile.BlockCacheKey, boolean, boolean, boolean),theRefCnt.recyclerofHFileBlock.buffrombackingMapandramCacheare different: 1.ForRefCnt.recyclerofHFileBlock.buffrombackingMap, it is the return value of currentcreateRecycler(org.apache.hadoop.hbase.io.hfile.bucket.BucketEntry)method. 2.ForRefCnt.recyclerofHFileBlock.buffromramCache, it isByteBuffAllocator.putbackBuffer(java.nio.ByteBuffer).
public boolean evictBlockIfNoRpcReferenced(BlockCacheKey blockCacheKey)
boolean evictBucketEntryIfNoRpcReferenced(BlockCacheKey blockCacheKey, BucketEntry bucketEntry)
BlockCacheKey and its corresponding BucketEntry only if
BucketEntry.isRpcRef() is false. backingMap,only the matched BlockCacheKey and
BucketEntry could be removed.blockCacheKey - BlockCacheKey to evict.bucketEntry - BucketEntry matched BlockCacheKey to evict.protected boolean removeFromRamCache(BlockCacheKey cacheKey)
public void logStats()
public long getRealCacheSize()
public long acceptableSize()
long getPartitionSize(float partitionFactor)
private int bucketSizesAboveThresholdCount(float minFactor)
private void freeEntireBuckets(int completelyFreeBucketsNeeded)
completelyFreeBucketsNeeded - number of buckets to freevoid freeSpace(String why)
why - Why we are being calledprotected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry)
key - Block cache keybucketEntry - Bucket entry to put into backingMap.BlockCacheUtil.shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey
cacheKey, Cacheable newBlock)private static String getAllocationFailWarningMessage(BucketAllocatorException fle, BucketCache.RAMQueueEntry re)
fle - The exceptionre - The RAMQueueEntry for which the exception was thrown.void doDrain(List<BucketCache.RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException
entries - Presumes list passed in here will be processed by this invocation only. No
interference expected.InterruptedExceptionstatic List<BucketCache.RAMQueueEntry> getRAMQueueEntries(BlockingQueue<BucketCache.RAMQueueEntry> q, List<BucketCache.RAMQueueEntry> receptacle) throws InterruptedException
q then tries to grab as many as possible before
returning.receptacle - Where to stash the elements taken from queue. We clear before we use it just
in case.q - The queue to take from.receptacle laden with elements taken from the queue or empty if none found.InterruptedExceptionprivate void persistToFile() throws IOException
IOExceptionretrieveFromFile(int[])private void retrieveFromFile(int[] bucketSizes) throws IOException
IOExceptionpersistToFile()private FileInputStream deleteFileOnClose(File file) throws IOException
File f = ...
try (FileInputStream fis = new FileInputStream(f)) {
// use the input stream
} finally {
if (!f.delete()) throw new IOException("failed to delete");
}
file - the file to read and deleteIOException - if there is a problem creating the streamprivate void verifyCapacityAndClasses(long capacitySize, String ioclass, String mapclass) throws IOException
IOExceptionprivate void parsePB(org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos.BucketCacheEntry proto) throws IOException
IOExceptionprivate void checkIOErrorIsTolerated()
private void disableCache()
private void join() throws InterruptedException
InterruptedExceptionpublic void shutdown()
BlockCacheshutdown in interface BlockCachepublic CacheStats getStats()
BlockCachegetStats in interface BlockCachepublic BucketAllocator getAllocator()
public long heapSize()
HeapSizepublic long size()
BlockCachesize in interface BlockCachepublic long getCurrentDataSize()
BlockCachegetCurrentDataSize in interface BlockCachepublic long getFreeSize()
BlockCachegetFreeSize in interface BlockCachepublic long getBlockCount()
BlockCachegetBlockCount in interface BlockCachepublic long getDataBlockCount()
BlockCachegetDataBlockCount in interface BlockCachepublic long getCurrentSize()
BlockCachegetCurrentSize in interface BlockCacheprotected String getAlgorithm()
public int evictBlocksByHfileName(String hfileName)
This is used for evict-on-close to remove all blocks of a specific HFile.
evictBlocksByHfileName in interface BlockCachevoid stopWriterThreads() throws InterruptedException
InterruptedExceptionpublic Iterator<CachedBlock> iterator()
BlockCacheiterator in interface Iterable<CachedBlock>iterator in interface BlockCachepublic BlockCache[] getBlockCaches()
BlockCachegetBlockCaches in interface BlockCachepublic int getRpcRefCount(BlockCacheKey cacheKey)
float getAcceptableFactor()
float getMinFactor()
float getExtraFreeFactor()
float getSingleFactor()
float getMultiFactor()
float getMemoryFactor()
Copyright © 2007–2020 The Apache Software Foundation. All rights reserved.