001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import java.io.IOException; 021import java.nio.ByteBuffer; 022import java.util.Comparator; 023import java.util.concurrent.atomic.AtomicBoolean; 024import java.util.concurrent.locks.ReentrantReadWriteLock; 025import java.util.function.Function; 026import org.apache.hadoop.hbase.io.ByteBuffAllocator; 027import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; 028import org.apache.hadoop.hbase.io.hfile.BlockPriority; 029import org.apache.hadoop.hbase.io.hfile.Cacheable; 030import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; 031import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; 032import org.apache.hadoop.hbase.nio.ByteBuff; 033import org.apache.hadoop.hbase.nio.HBaseReferenceCounted; 034import org.apache.hadoop.hbase.nio.RefCnt; 035import org.apache.hadoop.hbase.util.IdReadWriteLock; 036import org.apache.yetus.audience.InterfaceAudience; 037 038/** 039 * Item in cache. We expect this to be where most memory goes. Java uses 8 bytes just for object 040 * headers; after this, we want to use as little as possible - so we only use 8 bytes, but in order 041 * to do so we end up messing around with all this Java casting stuff. Offset stored as 5 bytes that 042 * make up the long. Doubt we'll see devices this big for ages. Offsets are divided by 256. So 5 043 * bytes gives us 256TB or so. 044 */ 045@InterfaceAudience.Private 046class BucketEntry implements HBaseReferenceCounted { 047 // access counter comparator, descending order 048 static final Comparator<BucketEntry> COMPARATOR = 049 Comparator.comparingLong(BucketEntry::getAccessCounter).reversed(); 050 051 private int offsetBase; 052 private int length; 053 private byte offset1; 054 055 /** 056 * The index of the deserializer that can deserialize this BucketEntry content. See 057 * {@link CacheableDeserializerIdManager} for hosting of index to serializers. 058 */ 059 byte deserializerIndex; 060 061 private volatile long accessCounter; 062 private BlockPriority priority; 063 064 /** 065 * <pre> 066 * The RefCnt means how many paths are referring the {@link BucketEntry}, there are two cases: 067 * 1.If the {@link IOEngine#usesSharedMemory()} is false(eg.{@link FileIOEngine}),the refCnt is 068 * always 1 until this {@link BucketEntry} is evicted from {@link BucketCache#backingMap}.Even 069 * if the corresponding {@link HFileBlock} is referenced by RPC reading, the refCnt should not 070 * increase. 071 * 072 * 2.If the {@link IOEngine#usesSharedMemory()} is true(eg.{@link ByteBufferIOEngine}),each RPC 073 * reading path is considering as one path, the {@link BucketCache#backingMap} reference is 074 * also considered a path. NOTICE that if two read RPC path hit the same {@link BucketEntry}, 075 * then the {@link HFileBlock}s the two RPC referred will share the same refCnt instance with 076 * the {@link BucketEntry},so the refCnt will increase or decrease as the following: 077 * (1) when writerThread flush the block into IOEngine and add the bucketEntry into backingMap, 078 * the refCnt ++; 079 * (2) If BucketCache evict the block and move the bucketEntry out of backingMap, the refCnt--; 080 * it usually happen when HFile is closing or someone call the clearBucketCache by force. 081 * (3) The read RPC path start to refer the block which is backend by the memory area in 082 * bucketEntry, then refCnt ++ ; 083 * (4) The read RPC patch shipped the response, and release the block. then refCnt--; 084 * Once the refCnt decrease to zero, then the {@link BucketAllocator} will free the block area. 085 * </pre> 086 */ 087 private final RefCnt refCnt; 088 final AtomicBoolean markedAsEvicted; 089 final ByteBuffAllocator allocator; 090 091 /** 092 * Time this block was cached. Presumes we are created just before we are added to the cache. 093 */ 094 private final long cachedTime = System.nanoTime(); 095 096 /** 097 * @param createRecycler used to free this {@link BucketEntry} when {@link BucketEntry#refCnt} 098 * becoming 0. NOTICE that {@link ByteBuffAllocator#NONE} could only be used 099 * for test. 100 */ 101 BucketEntry(long offset, int length, long accessCounter, boolean inMemory, 102 Function<BucketEntry, Recycler> createRecycler, ByteBuffAllocator allocator) { 103 if (createRecycler == null) { 104 throw new IllegalArgumentException("createRecycler could not be null!"); 105 } 106 setOffset(offset); 107 this.length = length; 108 this.accessCounter = accessCounter; 109 this.priority = inMemory ? BlockPriority.MEMORY : BlockPriority.MULTI; 110 this.refCnt = RefCnt.create(createRecycler.apply(this)); 111 112 this.markedAsEvicted = new AtomicBoolean(false); 113 this.allocator = allocator; 114 } 115 116 long offset() { 117 // Java has no unsigned numbers, so this needs the L cast otherwise it will be sign extended 118 // as a negative number. 119 long o = ((long) offsetBase) & 0xFFFFFFFFL; 120 // The 0xFF here does not need the L cast because it is treated as a positive int. 121 o += (((long) (offset1)) & 0xFF) << 32; 122 return o << 8; 123 } 124 125 private void setOffset(long value) { 126 assert (value & 0xFF) == 0; 127 value >>= 8; 128 offsetBase = (int) value; 129 offset1 = (byte) (value >> 32); 130 } 131 132 public int getLength() { 133 return length; 134 } 135 136 CacheableDeserializer<Cacheable> deserializerReference() { 137 return CacheableDeserializerIdManager.getDeserializer(deserializerIndex); 138 } 139 140 void setDeserializerReference(CacheableDeserializer<Cacheable> deserializer) { 141 this.deserializerIndex = (byte) deserializer.getDeserializerIdentifier(); 142 } 143 144 long getAccessCounter() { 145 return accessCounter; 146 } 147 148 /** 149 * Block has been accessed. Update its local access counter. 150 */ 151 void access(long accessCounter) { 152 this.accessCounter = accessCounter; 153 if (this.priority == BlockPriority.SINGLE) { 154 this.priority = BlockPriority.MULTI; 155 } 156 } 157 158 public BlockPriority getPriority() { 159 return this.priority; 160 } 161 162 long getCachedTime() { 163 return cachedTime; 164 } 165 166 /** 167 * The {@link BucketCache} will try to release its reference to this BucketEntry many times. we 168 * must make sure the idempotent, otherwise it'll decrease the RPC's reference count in advance, 169 * then for RPC memory leak happen. 170 * @return true if we deallocate this entry successfully. 171 */ 172 boolean markAsEvicted() { 173 if (markedAsEvicted.compareAndSet(false, true)) { 174 return this.release(); 175 } 176 return false; 177 } 178 179 /** 180 * Check whether have some RPC patch referring this block.<br/> 181 * For {@link IOEngine#usesSharedMemory()} is true(eg.{@link ByteBufferIOEngine}), there're two 182 * case: <br> 183 * 1. If current refCnt is greater than 1, there must be at least one referring RPC path; <br> 184 * 2. If current refCnt is equal to 1 and the markedAtEvicted is true, the it means backingMap has 185 * released its reference, the remaining reference can only be from RPC path. <br> 186 * We use this check to decide whether we can free the block area: when cached size exceed the 187 * acceptable size, our eviction policy will choose those stale blocks without any RPC reference 188 * and the RPC referred block will be excluded. <br/> 189 * <br/> 190 * For {@link IOEngine#usesSharedMemory()} is false(eg.{@link FileIOEngine}), 191 * {@link BucketEntry#refCnt} is always 1 until it is evicted from {@link BucketCache#backingMap}, 192 * so {@link BucketEntry#isRpcRef()} is always return false. 193 * @return true to indicate there're some RPC referring the block. 194 */ 195 boolean isRpcRef() { 196 boolean evicted = markedAsEvicted.get(); 197 return this.refCnt() > 1 || (evicted && refCnt() == 1); 198 } 199 200 Cacheable wrapAsCacheable(ByteBuffer[] buffers) throws IOException { 201 return wrapAsCacheable(ByteBuff.wrap(buffers, this.refCnt)); 202 } 203 204 Cacheable wrapAsCacheable(ByteBuff buf) throws IOException { 205 return this.deserializerReference().deserialize(buf, allocator); 206 } 207 208 interface BucketEntryHandler<T> { 209 T handle(); 210 } 211 212 <T> T withWriteLock(IdReadWriteLock<Long> offsetLock, BucketEntryHandler<T> handler) { 213 ReentrantReadWriteLock lock = offsetLock.getLock(this.offset()); 214 try { 215 lock.writeLock().lock(); 216 return handler.handle(); 217 } finally { 218 lock.writeLock().unlock(); 219 } 220 } 221 222 @Override 223 public int refCnt() { 224 return this.refCnt.refCnt(); 225 } 226 227 @Override 228 public BucketEntry retain() { 229 refCnt.retain(); 230 return this; 231 } 232 233 /** 234 * We've three cases to release refCnt now: <br> 235 * 1. BucketCache#evictBlock, it will release the backingMap's reference by force because we're 236 * closing file or clear the bucket cache or some corruption happen. when all rpc references gone, 237 * then free the area in bucketAllocator. <br> 238 * 2. BucketCache#returnBlock . when rpc shipped, we'll release the block, only when backingMap 239 * also release its refCnt (case.1 will do this) and no other rpc reference, then it will free the 240 * area in bucketAllocator. <br> 241 * 3.evict those block without any rpc reference if cache size exceeded. we'll only free those 242 * blocks with zero rpc reference count, as the {@link BucketEntry#markStaleAsEvicted()} do. 243 * @return true to indicate we've decreased to zero and do the de-allocation. 244 */ 245 @Override 246 public boolean release() { 247 return refCnt.release(); 248 } 249}