001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import java.io.IOException; 021import java.nio.ByteBuffer; 022import java.util.Comparator; 023import java.util.concurrent.atomic.AtomicBoolean; 024import java.util.concurrent.locks.ReentrantReadWriteLock; 025import java.util.function.Function; 026import org.apache.hadoop.hbase.io.ByteBuffAllocator; 027import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; 028import org.apache.hadoop.hbase.io.hfile.BlockPriority; 029import org.apache.hadoop.hbase.io.hfile.Cacheable; 030import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; 031import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; 032import org.apache.hadoop.hbase.nio.ByteBuff; 033import org.apache.hadoop.hbase.nio.HBaseReferenceCounted; 034import org.apache.hadoop.hbase.nio.RefCnt; 035import org.apache.hadoop.hbase.util.IdReadWriteLock; 036import org.apache.yetus.audience.InterfaceAudience; 037 038/** 039 * Item in cache. We expect this to be where most memory goes. Java uses 8 bytes just for object 040 * headers; after this, we want to use as little as possible - so we only use 8 bytes, but in order 041 * to do so we end up messing around with all this Java casting stuff. Offset stored as 5 bytes that 042 * make up the long. Doubt we'll see devices this big for ages. Offsets are divided by 256. So 5 043 * bytes gives us 256TB or so. 044 */ 045@InterfaceAudience.Private 046public class BucketEntry implements HBaseReferenceCounted { 047 // access counter comparator, descending order 048 static final Comparator<BucketEntry> COMPARATOR = 049 Comparator.comparingLong(BucketEntry::getAccessCounter).reversed(); 050 051 private int offsetBase; 052 private int length; 053 054 private int onDiskSizeWithHeader; 055 private byte offset1; 056 057 /** 058 * The index of the deserializer that can deserialize this BucketEntry content. See 059 * {@link CacheableDeserializerIdManager} for hosting of index to serializers. 060 */ 061 byte deserializerIndex; 062 063 private volatile long accessCounter; 064 private BlockPriority priority; 065 066 /** 067 * <pre> 068 * The RefCnt means how many paths are referring the {@link BucketEntry}, there are two cases: 069 * 1.If the {@link IOEngine#usesSharedMemory()} is false(eg.{@link FileIOEngine}),the refCnt is 070 * always 1 until this {@link BucketEntry} is evicted from {@link BucketCache#backingMap}.Even 071 * if the corresponding {@link HFileBlock} is referenced by RPC reading, the refCnt should not 072 * increase. 073 * 074 * 2.If the {@link IOEngine#usesSharedMemory()} is true(eg.{@link ByteBufferIOEngine}),each RPC 075 * reading path is considering as one path, the {@link BucketCache#backingMap} reference is 076 * also considered a path. NOTICE that if two read RPC path hit the same {@link BucketEntry}, 077 * then the {@link HFileBlock}s the two RPC referred will share the same refCnt instance with 078 * the {@link BucketEntry},so the refCnt will increase or decrease as the following: 079 * (1) when writerThread flush the block into IOEngine and add the bucketEntry into backingMap, 080 * the refCnt ++; 081 * (2) If BucketCache evict the block and move the bucketEntry out of backingMap, the refCnt--; 082 * it usually happen when HFile is closing or someone call the clearBucketCache by force. 083 * (3) The read RPC path start to refer the block which is backend by the memory area in 084 * bucketEntry, then refCnt ++ ; 085 * (4) The read RPC patch shipped the response, and release the block. then refCnt--; 086 * Once the refCnt decrease to zero, then the {@link BucketAllocator} will free the block area. 087 * </pre> 088 */ 089 private final RefCnt refCnt; 090 final AtomicBoolean markedAsEvicted; 091 final ByteBuffAllocator allocator; 092 093 /** 094 * Time this block was cached. Presumes we are created just before we are added to the cache. 095 */ 096 private long cachedTime = System.nanoTime(); 097 098 /** 099 * @param createRecycler used to free this {@link BucketEntry} when {@link BucketEntry#refCnt} 100 * becoming 0. NOTICE that {@link ByteBuffAllocator#NONE} could only be used 101 * for test. 102 */ 103 BucketEntry(long offset, int length, int onDiskSizeWithHeader, long accessCounter, 104 boolean inMemory, Function<BucketEntry, Recycler> createRecycler, ByteBuffAllocator allocator) { 105 this(offset, length, onDiskSizeWithHeader, accessCounter, System.nanoTime(), inMemory, 106 createRecycler, allocator); 107 } 108 109 BucketEntry(long offset, int length, int onDiskSizeWithHeader, long accessCounter, 110 long cachedTime, boolean inMemory, Function<BucketEntry, Recycler> createRecycler, 111 ByteBuffAllocator allocator) { 112 if (createRecycler == null) { 113 throw new IllegalArgumentException("createRecycler could not be null!"); 114 } 115 setOffset(offset); 116 this.length = length; 117 this.onDiskSizeWithHeader = onDiskSizeWithHeader; 118 this.accessCounter = accessCounter; 119 this.cachedTime = cachedTime; 120 this.priority = inMemory ? BlockPriority.MEMORY : BlockPriority.MULTI; 121 this.refCnt = RefCnt.create(createRecycler.apply(this)); 122 this.markedAsEvicted = new AtomicBoolean(false); 123 this.allocator = allocator; 124 } 125 126 long offset() { 127 // Java has no unsigned numbers, so this needs the L cast otherwise it will be sign extended 128 // as a negative number. 129 long o = ((long) offsetBase) & 0xFFFFFFFFL; 130 // The 0xFF here does not need the L cast because it is treated as a positive int. 131 o += (((long) (offset1)) & 0xFF) << 32; 132 return o << 8; 133 } 134 135 private void setOffset(long value) { 136 assert (value & 0xFF) == 0; 137 value >>= 8; 138 offsetBase = (int) value; 139 offset1 = (byte) (value >> 32); 140 } 141 142 public int getLength() { 143 return length; 144 } 145 146 CacheableDeserializer<Cacheable> deserializerReference() { 147 return CacheableDeserializerIdManager.getDeserializer(deserializerIndex); 148 } 149 150 void setDeserializerReference(CacheableDeserializer<Cacheable> deserializer) { 151 this.deserializerIndex = (byte) deserializer.getDeserializerIdentifier(); 152 } 153 154 long getAccessCounter() { 155 return accessCounter; 156 } 157 158 /** 159 * Block has been accessed. Update its local access counter. 160 */ 161 void access(long accessCounter) { 162 this.accessCounter = accessCounter; 163 if (this.priority == BlockPriority.SINGLE) { 164 this.priority = BlockPriority.MULTI; 165 } 166 } 167 168 public BlockPriority getPriority() { 169 return this.priority; 170 } 171 172 public long getCachedTime() { 173 return cachedTime; 174 } 175 176 public int getOnDiskSizeWithHeader() { 177 return onDiskSizeWithHeader; 178 } 179 180 /** 181 * The {@link BucketCache} will try to release its reference to this BucketEntry many times. we 182 * must make sure the idempotent, otherwise it'll decrease the RPC's reference count in advance, 183 * then for RPC memory leak happen. 184 * @return true if we deallocate this entry successfully. 185 */ 186 boolean markAsEvicted() { 187 if (markedAsEvicted.compareAndSet(false, true)) { 188 return this.release(); 189 } 190 return false; 191 } 192 193 /** 194 * Check whether have some RPC patch referring this block.<br/> 195 * For {@link IOEngine#usesSharedMemory()} is true(eg.{@link ByteBufferIOEngine}), there're two 196 * case: <br> 197 * 1. If current refCnt is greater than 1, there must be at least one referring RPC path; <br> 198 * 2. If current refCnt is equal to 1 and the markedAtEvicted is true, the it means backingMap has 199 * released its reference, the remaining reference can only be from RPC path. <br> 200 * We use this check to decide whether we can free the block area: when cached size exceed the 201 * acceptable size, our eviction policy will choose those stale blocks without any RPC reference 202 * and the RPC referred block will be excluded. <br/> 203 * <br/> 204 * For {@link IOEngine#usesSharedMemory()} is false(eg.{@link FileIOEngine}), 205 * {@link BucketEntry#refCnt} is always 1 until it is evicted from {@link BucketCache#backingMap}, 206 * so {@link BucketEntry#isRpcRef()} is always return false. 207 * @return true to indicate there're some RPC referring the block. 208 */ 209 boolean isRpcRef() { 210 boolean evicted = markedAsEvicted.get(); 211 return this.refCnt() > 1 || (evicted && refCnt() == 1); 212 } 213 214 Cacheable wrapAsCacheable(ByteBuffer[] buffers) throws IOException { 215 return wrapAsCacheable(ByteBuff.wrap(buffers, this.refCnt)); 216 } 217 218 Cacheable wrapAsCacheable(ByteBuff buf) throws IOException { 219 return this.deserializerReference().deserialize(buf, allocator); 220 } 221 222 interface BucketEntryHandler<T> { 223 T handle(); 224 } 225 226 <T> T withWriteLock(IdReadWriteLock<Long> offsetLock, BucketEntryHandler<T> handler) { 227 ReentrantReadWriteLock lock = offsetLock.getLock(this.offset()); 228 try { 229 lock.writeLock().lock(); 230 return handler.handle(); 231 } finally { 232 lock.writeLock().unlock(); 233 } 234 } 235 236 @Override 237 public int refCnt() { 238 return this.refCnt.refCnt(); 239 } 240 241 @Override 242 public BucketEntry retain() { 243 refCnt.retain(); 244 return this; 245 } 246 247 /** 248 * We've three cases to release refCnt now: <br> 249 * 1. BucketCache#evictBlock, it will release the backingMap's reference by force because we're 250 * closing file or clear the bucket cache or some corruption happen. when all rpc references gone, 251 * then free the area in bucketAllocator. <br> 252 * 2. BucketCache#returnBlock . when rpc shipped, we'll release the block, only when backingMap 253 * also release its refCnt (case.1 will do this) and no other rpc reference, then it will free the 254 * area in bucketAllocator. <br> 255 * 3.evict those block without any rpc reference if cache size exceeded. we'll only free those 256 * blocks with zero rpc reference count. 257 * @return true to indicate we've decreased to zero and do the de-allocation. 258 */ 259 @Override 260 public boolean release() { 261 return refCnt.release(); 262 } 263}