@InterfaceAudience.Private public class BucketCache extends Object implements BlockCache, HeapSize
BucketAllocator
to allocate/free blocks, and uses BucketCache#ramCache
and BucketCache#backingMap in order to determine if a given element is in the cache. The bucket
cache can use off-heap memory ByteBufferIOEngine
or mmap
ExclusiveMemoryMmapIOEngine
or pmem SharedMemoryMmapIOEngine
or local files
FileIOEngine
to store/read the block data.
Eviction is via a similar algorithm as used in
LruBlockCache
BucketCache can be used as mainly a block cache (see
CombinedBlockCache
), combined with a BlockCache to
decrease CMS GC and heap fragmentation.
It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store blocks) to enlarge cache space via a victim cache.
Modifier and Type | Class and Description |
---|---|
private class |
BucketCache.BucketEntryGroup
Used to group bucket entries into priority buckets.
|
(package private) static class |
BucketCache.RAMCache
Wrapped the delegate ConcurrentMap with maintaining its block's reference count.
|
(package private) static class |
BucketCache.RAMQueueEntry
Block Entry stored in the memory with key,data and so on
|
private static class |
BucketCache.StatisticsThread |
(package private) class |
BucketCache.WriterThread |
Modifier and Type | Field and Description |
---|---|
(package private) static String |
ACCEPT_FACTOR_CONFIG_NAME |
private float |
acceptableFactor
Acceptable size of cache (no evictions if size < acceptable)
|
private AtomicLong |
accessCount
Cache access count (sequential ID)
|
private String |
algorithm
Use
MessageDigest class's encryption algorithms to check persistent file
integrity, default algorithm is MD5 |
private static int |
ALLOCATION_FAIL_LOG_TIME_PERIOD |
private long |
allocFailLogPrevTs |
(package private) ConcurrentHashMap<BlockCacheKey,BucketEntry> |
backingMap |
private LongAdder |
blockNumber
Current number of cached elements
|
private NavigableSet<BlockCacheKey> |
blocksByHFile |
private long |
blockSize
Approximate block size
|
private BucketAllocator |
bucketAllocator |
private long |
cacheCapacity |
private boolean |
cacheEnabled
Flag if the cache is enabled or not...
|
private BucketCacheStats |
cacheStats |
private static float |
DEFAULT_ACCEPT_FACTOR |
private static int |
DEFAULT_CACHE_WAIT_TIME |
static int |
DEFAULT_ERROR_TOLERATION_DURATION |
private static float |
DEFAULT_EXTRA_FREE_FACTOR |
private static String |
DEFAULT_FILE_VERIFY_ALGORITHM |
private static int |
DEFAULT_FREE_ENTIRE_BLOCK_FACTOR |
(package private) static float |
DEFAULT_MEMORY_FACTOR |
(package private) static float |
DEFAULT_MIN_FACTOR |
(package private) static float |
DEFAULT_MULTI_FACTOR |
(package private) static float |
DEFAULT_SINGLE_FACTOR
Priority buckets
|
(package private) static int |
DEFAULT_WRITER_QUEUE_ITEMS |
(package private) static int |
DEFAULT_WRITER_THREADS |
(package private) static String |
EXTRA_FREE_FACTOR_CONFIG_NAME |
private float |
extraFreeFactor
Free this floating point factor of extra blocks when evicting.
|
private static String |
FILE_VERIFY_ALGORITHM |
private boolean |
freeInProgress
Volatile boolean to track if free space is in process or not
|
private Lock |
freeSpaceLock |
private LongAdder |
heapSize |
(package private) IOEngine |
ioEngine |
private long |
ioErrorStartTime |
private int |
ioErrorsTolerationDuration
Duration of IO errors tolerated before we disable cache, 1 min as default
|
private static org.slf4j.Logger |
LOG |
(package private) static String |
MEMORY_FACTOR_CONFIG_NAME |
private float |
memoryFactor
In-memory bucket size
|
(package private) static String |
MIN_FACTOR_CONFIG_NAME |
private float |
minFactor
Minimum threshold of cache (when evicting, evict until size < min)
|
(package private) static String |
MULTI_FACTOR_CONFIG_NAME |
private float |
multiFactor
Multiple access bucket size
|
(package private) IdReadWriteLock<Long> |
offsetLock
A ReentrantReadWriteLock to lock on a particular block identified by offset.
|
private String |
persistencePath |
(package private) BucketCache.RAMCache |
ramCache |
private LongAdder |
realCacheSize |
private ScheduledExecutorService |
scheduleThreadPool
Statistics thread schedule pool (for heavy debugging, could remove)
|
(package private) static String |
SINGLE_FACTOR_CONFIG_NAME
Priority buckets config
|
private float |
singleFactor
Single access bucket size
|
private static int |
statThreadPeriod
Statistics thread
|
(package private) boolean |
wait_when_cache
Used in tests.
|
(package private) ArrayList<BlockingQueue<BucketCache.RAMQueueEntry>> |
writerQueues
A list of writer queues.
|
(package private) BucketCache.WriterThread[] |
writerThreads |
Constructor and Description |
---|
BucketCache(String ioEngineName,
long capacity,
int blockSize,
int[] bucketSizes,
int writerThreadNum,
int writerQLen,
String persistencePath) |
BucketCache(String ioEngineName,
long capacity,
int blockSize,
int[] bucketSizes,
int writerThreadNum,
int writerQLen,
String persistencePath,
int ioErrorsTolerationDuration,
org.apache.hadoop.conf.Configuration conf) |
Modifier and Type | Method and Description |
---|---|
long |
acceptableSize() |
(package private) void |
blockEvicted(BlockCacheKey cacheKey,
BucketEntry bucketEntry,
boolean decrementBlockNumber,
boolean evictedByEvictionProcess)
This method is invoked after the bucketEntry is removed from
backingMap |
private int |
bucketSizesAboveThresholdCount(float minFactor)
Return the count of bucketSizeinfos still need free space
|
void |
cacheBlock(BlockCacheKey cacheKey,
Cacheable buf)
Cache the block with the specified name and buffer.
|
void |
cacheBlock(BlockCacheKey cacheKey,
Cacheable cachedItem,
boolean inMemory)
Cache the block with the specified name and buffer.
|
void |
cacheBlockWithWait(BlockCacheKey cacheKey,
Cacheable cachedItem,
boolean inMemory,
boolean wait)
Cache the block to ramCache
|
protected void |
cacheBlockWithWaitInternal(BlockCacheKey cacheKey,
Cacheable cachedItem,
boolean inMemory,
boolean wait) |
private void |
checkIOErrorIsTolerated()
Check whether we tolerate IO error this time.
|
private ByteBuffAllocator.Recycler |
createRecycler(BucketEntry bucketEntry)
Create the
ByteBuffAllocator.Recycler for BucketEntry.refCnt ,which would be used as
RefCnt.recycler of HFileBlock.buf returned from getBlock(org.apache.hadoop.hbase.io.hfile.BlockCacheKey, boolean, boolean, boolean) . |
private FileInputStream |
deleteFileOnClose(File file)
Create an input stream that deletes the file after reading it.
|
private void |
disableCache()
Used to shut down the cache -or- turn it off in the case of something broken.
|
(package private) void |
doDrain(List<BucketCache.RAMQueueEntry> entries,
ByteBuffer metaBuff)
Flush the entries in ramCache to IOEngine and add bucket entry to backingMap.
|
private boolean |
doEvictBlock(BlockCacheKey cacheKey,
BucketEntry bucketEntry,
boolean evictedByEvictionProcess)
|
boolean |
evictBlock(BlockCacheKey cacheKey)
Try to evict the block from
BlockCache by force. |
boolean |
evictBlockIfNoRpcReferenced(BlockCacheKey blockCacheKey)
NOTE: This method is only for test.
|
int |
evictBlocksByHfileName(String hfileName)
Evicts all blocks for a specific HFile.
|
(package private) boolean |
evictBucketEntryIfNoRpcReferenced(BlockCacheKey blockCacheKey,
BucketEntry bucketEntry)
|
(package private) void |
freeBucketEntry(BucketEntry bucketEntry)
Free the {
BucketEntry actually,which could only be invoked when the
BucketEntry.refCnt becoming 0. |
private void |
freeEntireBuckets(int completelyFreeBucketsNeeded)
This method will find the buckets that are minimally occupied and are not reference counted and
will free them completely without any constraint on the access times of the elements, and as a
process will completely free at most the number of buckets passed, sometimes it might not due
to changing refCounts
|
(package private) void |
freeSpace(String why)
Free the space if the used size reaches acceptableSize() or one size block couldn't be
allocated.
|
(package private) float |
getAcceptableFactor() |
protected String |
getAlgorithm() |
private static String |
getAllocationFailWarningMessage(BucketAllocatorException fle,
BucketCache.RAMQueueEntry re)
Prepare and return a warning message for Bucket Allocator Exception
|
BucketAllocator |
getAllocator() |
Cacheable |
getBlock(BlockCacheKey key,
boolean caching,
boolean repeat,
boolean updateCacheMetrics)
Get the buffer of the block with the specified key.
|
BlockCache[] |
getBlockCaches()
Returns The list of sub blockcaches that make up this one; returns null if no sub caches.
|
long |
getBlockCount()
Returns the number of blocks currently cached in the block cache.
|
long |
getCurrentDataSize()
Returns the occupied size of data blocks, in bytes.
|
long |
getCurrentSize()
Returns the occupied size of the block cache, in bytes.
|
long |
getDataBlockCount()
Returns the number of data blocks currently cached in the block cache.
|
(package private) float |
getExtraFreeFactor() |
long |
getFreeSize()
Returns the free size of the block cache, in bytes.
|
String |
getIoEngine() |
private IOEngine |
getIOEngineFromName(String ioEngineName,
long capacity,
String persistencePath)
Get the IOEngine from the IO engine name nnn * @return the IOEngine n
|
long |
getMaxSize()
Returns the Max size of the block cache, in bytes.
|
(package private) float |
getMemoryFactor() |
(package private) float |
getMinFactor() |
(package private) float |
getMultiFactor() |
(package private) long |
getPartitionSize(float partitionFactor) |
(package private) static List<BucketCache.RAMQueueEntry> |
getRAMQueueEntries(BlockingQueue<BucketCache.RAMQueueEntry> q,
List<BucketCache.RAMQueueEntry> receptacle)
Blocks until elements available in
q then tries to grab as many as possible before
returning. |
long |
getRealCacheSize() |
int |
getRpcRefCount(BlockCacheKey cacheKey) |
(package private) float |
getSingleFactor() |
CacheStats |
getStats()
Get the statistics for this block cache.
|
long |
heapSize()
Return the approximate 'exclusive deep size' of implementing object.
|
(package private) boolean |
isCacheEnabled() |
Iterator<CachedBlock> |
iterator()
Returns Iterator over the blocks in the cache.
|
private void |
join() |
void |
logStats() |
private void |
parsePB(org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos.BucketCacheEntry proto) |
private void |
persistToFile() |
protected void |
putIntoBackingMap(BlockCacheKey key,
BucketEntry bucketEntry)
Put the new bucket entry into backingMap.
|
protected boolean |
removeFromRamCache(BlockCacheKey cacheKey) |
private void |
retrieveFromFile(int[] bucketSizes) |
private void |
sanityCheckConfigs() |
protected boolean |
shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey,
Cacheable newBlock) |
void |
shutdown()
Shutdown the cache.
|
long |
size()
Returns the total size of the block cache, in bytes.
|
protected void |
startWriterThreads()
Called by the constructor to start the writer threads.
|
(package private) void |
stopWriterThreads()
Only used in test n
|
private void |
verifyCapacityAndClasses(long capacitySize,
String ioclass,
String mapclass) |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
isMetaBlock
forEach, spliterator
private static final org.slf4j.Logger LOG
static final String SINGLE_FACTOR_CONFIG_NAME
static final String MULTI_FACTOR_CONFIG_NAME
static final String MEMORY_FACTOR_CONFIG_NAME
static final String EXTRA_FREE_FACTOR_CONFIG_NAME
static final String ACCEPT_FACTOR_CONFIG_NAME
static final String MIN_FACTOR_CONFIG_NAME
static final float DEFAULT_SINGLE_FACTOR
static final float DEFAULT_MULTI_FACTOR
static final float DEFAULT_MEMORY_FACTOR
static final float DEFAULT_MIN_FACTOR
private static final float DEFAULT_EXTRA_FREE_FACTOR
private static final float DEFAULT_ACCEPT_FACTOR
private static final int DEFAULT_FREE_ENTIRE_BLOCK_FACTOR
private static final int statThreadPeriod
static final int DEFAULT_WRITER_THREADS
static final int DEFAULT_WRITER_QUEUE_ITEMS
final transient BucketCache.RAMCache ramCache
transient ConcurrentHashMap<BlockCacheKey,BucketEntry> backingMap
private volatile boolean cacheEnabled
final transient ArrayList<BlockingQueue<BucketCache.RAMQueueEntry>> writerQueues
BucketCache.WriterThread
we have running. In other
words, the work adding blocks to the BucketCache is divided up amongst the running
WriterThreads. Its done by taking hash of the cache key modulo queue count. WriterThread when
it runs takes whatever has been recently added and 'drains' the entries to the BucketCache. It
then updates the ramCache and backingMap accordingly.final transient BucketCache.WriterThread[] writerThreads
private volatile boolean freeInProgress
private final transient Lock freeSpaceLock
private final LongAdder realCacheSize
private final LongAdder blockNumber
private final AtomicLong accessCount
private static final int DEFAULT_CACHE_WAIT_TIME
boolean wait_when_cache
private final BucketCacheStats cacheStats
private final String persistencePath
private final long cacheCapacity
private final long blockSize
private final int ioErrorsTolerationDuration
public static final int DEFAULT_ERROR_TOLERATION_DURATION
private volatile long ioErrorStartTime
final transient IdReadWriteLock<Long> offsetLock
Key set of offsets in BucketCache is limited so soft reference is the best choice here.
private final NavigableSet<BlockCacheKey> blocksByHFile
private final transient ScheduledExecutorService scheduleThreadPool
private transient BucketAllocator bucketAllocator
private float acceptableFactor
private float minFactor
private float extraFreeFactor
private float singleFactor
private float multiFactor
private float memoryFactor
private static final String FILE_VERIFY_ALGORITHM
private static final String DEFAULT_FILE_VERIFY_ALGORITHM
private String algorithm
MessageDigest
class's encryption algorithms to check persistent file
integrity, default algorithm is MD5private long allocFailLogPrevTs
private static final int ALLOCATION_FAIL_LOG_TIME_PERIOD
public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, int writerThreadNum, int writerQLen, String persistencePath) throws IOException
IOException
public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration, org.apache.hadoop.conf.Configuration conf) throws IOException
IOException
private void sanityCheckConfigs()
protected void startWriterThreads()
boolean isCacheEnabled()
public long getMaxSize()
BlockCache
getMaxSize
in interface BlockCache
public String getIoEngine()
private IOEngine getIOEngineFromName(String ioEngineName, long capacity, String persistencePath) throws IOException
IOException
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf)
cacheBlock
in interface BlockCache
cacheKey
- block's cache keybuf
- block bufferpublic void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory)
cacheBlock
in interface BlockCache
cacheKey
- block's cache keycachedItem
- block bufferinMemory
- if block is in-memorypublic void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, boolean wait)
cacheKey
- block's cache keycachedItem
- block bufferinMemory
- if block is in-memorywait
- if true, blocking wait when queue is fullprotected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock)
protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, boolean wait)
public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, boolean updateCacheMetrics)
getBlock
in interface BlockCache
key
- block's cache keycaching
- true if the caller caches blocks on cache missesrepeat
- Whether this is a repeat lookup for the same blockupdateCacheMetrics
- Whether we should update cache metrics or notvoid blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber, boolean evictedByEvictionProcess)
backingMap
void freeBucketEntry(BucketEntry bucketEntry)
BucketEntry
actually,which could only be invoked when the
BucketEntry.refCnt
becoming 0.public boolean evictBlock(BlockCacheKey cacheKey)
BlockCache
by force. We'll call this in few cases:Admin.clearBlockCache(TableName)
to clear all blocks for a given table.
Firstly, we'll try to remove the block from RAMCache,and then try to evict from backingMap.
Here we evict the block from backingMap immediately, but only free the reference from bucket
cache by calling BucketEntry.markedAsEvicted
. If there're still some RPC referring this
block, block can only be de-allocated when all of them release the block.
NOTICE: we need to grab the write offset lock firstly before releasing the reference from
bucket cache. if we don't, we may read an BucketEntry
with refCnt = 0 when
getBlock(BlockCacheKey, boolean, boolean, boolean)
, it's a memory leak.
evictBlock
in interface BlockCache
cacheKey
- Block to evictprivate boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean evictedByEvictionProcess)
BlockCacheKey
and BucketEntry
from backingMap
and
ramCache
. backingMap
,only the matched BlockCacheKey
and
BucketEntry
could be removed.cacheKey
- BlockCacheKey
to evict.bucketEntry
- BucketEntry
matched BlockCacheKey
to evict.private ByteBuffAllocator.Recycler createRecycler(BucketEntry bucketEntry)
Create theByteBuffAllocator.Recycler
forBucketEntry.refCnt
,which would be used asRefCnt.recycler
ofHFileBlock.buf
returned fromgetBlock(org.apache.hadoop.hbase.io.hfile.BlockCacheKey, boolean, boolean, boolean)
. NOTE: forgetBlock(org.apache.hadoop.hbase.io.hfile.BlockCacheKey, boolean, boolean, boolean)
,theRefCnt.recycler
ofHFileBlock.buf
frombackingMap
andramCache
are different: 1.ForRefCnt.recycler
ofHFileBlock.buf
frombackingMap
, it is the return value of currentcreateRecycler(org.apache.hadoop.hbase.io.hfile.bucket.BucketEntry)
method. 2.ForRefCnt.recycler
ofHFileBlock.buf
fromramCache
, it isByteBuffAllocator.putbackBuffer(java.nio.ByteBuffer)
.
public boolean evictBlockIfNoRpcReferenced(BlockCacheKey blockCacheKey)
boolean evictBucketEntryIfNoRpcReferenced(BlockCacheKey blockCacheKey, BucketEntry bucketEntry)
BlockCacheKey
and its corresponding BucketEntry
only if
BucketEntry.isRpcRef()
is false. backingMap
,only the matched BlockCacheKey
and
BucketEntry
could be removed.blockCacheKey
- BlockCacheKey
to evict.bucketEntry
- BucketEntry
matched BlockCacheKey
to evict.protected boolean removeFromRamCache(BlockCacheKey cacheKey)
public void logStats()
public long getRealCacheSize()
public long acceptableSize()
long getPartitionSize(float partitionFactor)
private int bucketSizesAboveThresholdCount(float minFactor)
private void freeEntireBuckets(int completelyFreeBucketsNeeded)
completelyFreeBucketsNeeded
- number of buckets to freevoid freeSpace(String why)
why
- Why we are being calledprotected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry)
key
- Block cache keybucketEntry
- Bucket entry to put into backingMap.BlockCacheUtil.shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey
cacheKey, Cacheable newBlock)
private static String getAllocationFailWarningMessage(BucketAllocatorException fle, BucketCache.RAMQueueEntry re)
fle
- The exceptionre
- The RAMQueueEntry for which the exception was thrown.void doDrain(List<BucketCache.RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException
entries
- Presumes list passed in here will be processed by this invocation only. No
interference expected.InterruptedException
static List<BucketCache.RAMQueueEntry> getRAMQueueEntries(BlockingQueue<BucketCache.RAMQueueEntry> q, List<BucketCache.RAMQueueEntry> receptacle) throws InterruptedException
q
then tries to grab as many as possible before
returning.receptacle
- Where to stash the elements taken from queue. We clear before we use it just
in case.q
- The queue to take from.receptacle
laden with elements taken from the queue or empty if none found.InterruptedException
private void persistToFile() throws IOException
IOException
retrieveFromFile(int[])
private void retrieveFromFile(int[] bucketSizes) throws IOException
IOException
persistToFile()
private FileInputStream deleteFileOnClose(File file) throws IOException
File f = ... try (FileInputStream fis = new FileInputStream(f)) { // use the input stream } finally { if (!f.delete()) throw new IOException("failed to delete"); }
file
- the file to read and deleteIOException
- if there is a problem creating the streamprivate void verifyCapacityAndClasses(long capacitySize, String ioclass, String mapclass) throws IOException
IOException
private void parsePB(org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos.BucketCacheEntry proto) throws IOException
IOException
private void checkIOErrorIsTolerated()
private void disableCache()
private void join() throws InterruptedException
InterruptedException
public void shutdown()
BlockCache
shutdown
in interface BlockCache
public CacheStats getStats()
BlockCache
getStats
in interface BlockCache
public BucketAllocator getAllocator()
public long heapSize()
HeapSize
public long size()
BlockCache
size
in interface BlockCache
public long getCurrentDataSize()
BlockCache
getCurrentDataSize
in interface BlockCache
public long getFreeSize()
BlockCache
getFreeSize
in interface BlockCache
public long getBlockCount()
BlockCache
getBlockCount
in interface BlockCache
public long getDataBlockCount()
BlockCache
getDataBlockCount
in interface BlockCache
public long getCurrentSize()
BlockCache
getCurrentSize
in interface BlockCache
protected String getAlgorithm()
public int evictBlocksByHfileName(String hfileName)
This is used for evict-on-close to remove all blocks of a specific HFile.
evictBlocksByHfileName
in interface BlockCache
void stopWriterThreads() throws InterruptedException
InterruptedException
public Iterator<CachedBlock> iterator()
BlockCache
iterator
in interface Iterable<CachedBlock>
iterator
in interface BlockCache
public BlockCache[] getBlockCaches()
BlockCache
getBlockCaches
in interface BlockCache
public int getRpcRefCount(BlockCacheKey cacheKey)
float getAcceptableFactor()
float getMinFactor()
float getExtraFreeFactor()
float getSingleFactor()
float getMultiFactor()
float getMemoryFactor()
Copyright © 2007–2020 The Apache Software Foundation. All rights reserved.