001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import java.io.IOException;
021import java.nio.ByteBuffer;
022import java.util.Comparator;
023import java.util.concurrent.atomic.AtomicBoolean;
024import java.util.concurrent.locks.ReentrantReadWriteLock;
025import java.util.function.Function;
026import org.apache.hadoop.hbase.io.ByteBuffAllocator;
027import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
028import org.apache.hadoop.hbase.io.hfile.BlockPriority;
029import org.apache.hadoop.hbase.io.hfile.Cacheable;
030import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
031import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
032import org.apache.hadoop.hbase.nio.ByteBuff;
033import org.apache.hadoop.hbase.nio.HBaseReferenceCounted;
034import org.apache.hadoop.hbase.nio.RefCnt;
035import org.apache.hadoop.hbase.util.IdReadWriteLock;
036import org.apache.yetus.audience.InterfaceAudience;
037
038/**
039 * Item in cache. We expect this to be where most memory goes. Java uses 8 bytes just for object
040 * headers; after this, we want to use as little as possible - so we only use 8 bytes, but in order
041 * to do so we end up messing around with all this Java casting stuff. Offset stored as 5 bytes that
042 * make up the long. Doubt we'll see devices this big for ages. Offsets are divided by 256. So 5
043 * bytes gives us 256TB or so.
044 */
045@InterfaceAudience.Private
046public class BucketEntry implements HBaseReferenceCounted {
047  // access counter comparator, descending order
048  static final Comparator<BucketEntry> COMPARATOR =
049    Comparator.comparingLong(BucketEntry::getAccessCounter).reversed();
050
051  private int offsetBase;
052  private int length;
053
054  private int onDiskSizeWithHeader;
055  private byte offset1;
056
057  /**
058   * The index of the deserializer that can deserialize this BucketEntry content. See
059   * {@link CacheableDeserializerIdManager} for hosting of index to serializers.
060   */
061  byte deserializerIndex;
062
063  private volatile long accessCounter;
064  private BlockPriority priority;
065
066  /**
067   * <pre>
068   * The RefCnt means how many paths are referring the {@link BucketEntry}, there are two cases:
069   * 1.If the {@link IOEngine#usesSharedMemory()} is false(eg.{@link FileIOEngine}),the refCnt is
070   *   always 1 until this {@link BucketEntry} is evicted from {@link BucketCache#backingMap}.Even
071   *   if the corresponding {@link HFileBlock} is referenced by RPC reading, the refCnt should not
072   *   increase.
073   *
074   * 2.If the {@link IOEngine#usesSharedMemory()} is true(eg.{@link ByteBufferIOEngine}),each RPC
075   *   reading path is considering as one path, the {@link BucketCache#backingMap} reference is
076   *   also considered a path. NOTICE that if two read RPC path hit the same {@link BucketEntry},
077   *   then the {@link HFileBlock}s the two RPC referred will share the same refCnt instance with
078   *   the {@link BucketEntry},so the refCnt will increase or decrease as the following:
079   *   (1) when writerThread flush the block into IOEngine and add the bucketEntry into backingMap,
080   *       the refCnt ++;
081   *   (2) If BucketCache evict the block and move the bucketEntry out of backingMap, the refCnt--;
082   *       it usually happen when HFile is closing or someone call the clearBucketCache by force.
083   *   (3) The read RPC path start to refer the block which is backend by the memory area in
084   *       bucketEntry, then refCnt ++ ;
085   *   (4) The read RPC patch shipped the response, and release the block. then refCnt--;
086   *    Once the refCnt decrease to zero, then the {@link BucketAllocator} will free the block area.
087   * </pre>
088   */
089  private final RefCnt refCnt;
090  final AtomicBoolean markedAsEvicted;
091  final ByteBuffAllocator allocator;
092
093  /**
094   * Time this block was cached. Presumes we are created just before we are added to the cache.
095   */
096  private long cachedTime = System.nanoTime();
097
098  /**
099   * @param createRecycler used to free this {@link BucketEntry} when {@link BucketEntry#refCnt}
100   *                       becoming 0. NOTICE that {@link ByteBuffAllocator#NONE} could only be used
101   *                       for test.
102   */
103  BucketEntry(long offset, int length, int onDiskSizeWithHeader, long accessCounter,
104    boolean inMemory, Function<BucketEntry, Recycler> createRecycler, ByteBuffAllocator allocator) {
105    this(offset, length, onDiskSizeWithHeader, accessCounter, System.nanoTime(), inMemory,
106      createRecycler, allocator);
107  }
108
109  BucketEntry(long offset, int length, int onDiskSizeWithHeader, long accessCounter,
110    long cachedTime, boolean inMemory, Function<BucketEntry, Recycler> createRecycler,
111    ByteBuffAllocator allocator) {
112    if (createRecycler == null) {
113      throw new IllegalArgumentException("createRecycler could not be null!");
114    }
115    setOffset(offset);
116    this.length = length;
117    this.onDiskSizeWithHeader = onDiskSizeWithHeader;
118    this.accessCounter = accessCounter;
119    this.cachedTime = cachedTime;
120    this.priority = inMemory ? BlockPriority.MEMORY : BlockPriority.MULTI;
121    this.refCnt = RefCnt.create(createRecycler.apply(this));
122    this.markedAsEvicted = new AtomicBoolean(false);
123    this.allocator = allocator;
124  }
125
126  long offset() {
127    // Java has no unsigned numbers, so this needs the L cast otherwise it will be sign extended
128    // as a negative number.
129    long o = ((long) offsetBase) & 0xFFFFFFFFL;
130    // The 0xFF here does not need the L cast because it is treated as a positive int.
131    o += (((long) (offset1)) & 0xFF) << 32;
132    return o << 8;
133  }
134
135  private void setOffset(long value) {
136    assert (value & 0xFF) == 0;
137    value >>= 8;
138    offsetBase = (int) value;
139    offset1 = (byte) (value >> 32);
140  }
141
142  public int getLength() {
143    return length;
144  }
145
146  CacheableDeserializer<Cacheable> deserializerReference() {
147    return CacheableDeserializerIdManager.getDeserializer(deserializerIndex);
148  }
149
150  void setDeserializerReference(CacheableDeserializer<Cacheable> deserializer) {
151    this.deserializerIndex = (byte) deserializer.getDeserializerIdentifier();
152  }
153
154  long getAccessCounter() {
155    return accessCounter;
156  }
157
158  /**
159   * Block has been accessed. Update its local access counter.
160   */
161  void access(long accessCounter) {
162    this.accessCounter = accessCounter;
163    if (this.priority == BlockPriority.SINGLE) {
164      this.priority = BlockPriority.MULTI;
165    }
166  }
167
168  public BlockPriority getPriority() {
169    return this.priority;
170  }
171
172  public long getCachedTime() {
173    return cachedTime;
174  }
175
176  public int getOnDiskSizeWithHeader() {
177    return onDiskSizeWithHeader;
178  }
179
180  /**
181   * The {@link BucketCache} will try to release its reference to this BucketEntry many times. we
182   * must make sure the idempotent, otherwise it'll decrease the RPC's reference count in advance,
183   * then for RPC memory leak happen.
184   * @return true if we deallocate this entry successfully.
185   */
186  boolean markAsEvicted() {
187    if (markedAsEvicted.compareAndSet(false, true)) {
188      return this.release();
189    }
190    return false;
191  }
192
193  /**
194   * Check whether have some RPC patch referring this block.<br/>
195   * For {@link IOEngine#usesSharedMemory()} is true(eg.{@link ByteBufferIOEngine}), there're two
196   * case: <br>
197   * 1. If current refCnt is greater than 1, there must be at least one referring RPC path; <br>
198   * 2. If current refCnt is equal to 1 and the markedAtEvicted is true, the it means backingMap has
199   * released its reference, the remaining reference can only be from RPC path. <br>
200   * We use this check to decide whether we can free the block area: when cached size exceed the
201   * acceptable size, our eviction policy will choose those stale blocks without any RPC reference
202   * and the RPC referred block will be excluded. <br/>
203   * <br/>
204   * For {@link IOEngine#usesSharedMemory()} is false(eg.{@link FileIOEngine}),
205   * {@link BucketEntry#refCnt} is always 1 until it is evicted from {@link BucketCache#backingMap},
206   * so {@link BucketEntry#isRpcRef()} is always return false.
207   * @return true to indicate there're some RPC referring the block.
208   */
209  boolean isRpcRef() {
210    boolean evicted = markedAsEvicted.get();
211    return this.refCnt() > 1 || (evicted && refCnt() == 1);
212  }
213
214  Cacheable wrapAsCacheable(ByteBuffer[] buffers) throws IOException {
215    return wrapAsCacheable(ByteBuff.wrap(buffers, this.refCnt));
216  }
217
218  Cacheable wrapAsCacheable(ByteBuff buf) throws IOException {
219    return this.deserializerReference().deserialize(buf, allocator);
220  }
221
222  interface BucketEntryHandler<T> {
223    T handle();
224  }
225
226  <T> T withWriteLock(IdReadWriteLock<Long> offsetLock, BucketEntryHandler<T> handler) {
227    ReentrantReadWriteLock lock = offsetLock.getLock(this.offset());
228    try {
229      lock.writeLock().lock();
230      return handler.handle();
231    } finally {
232      lock.writeLock().unlock();
233    }
234  }
235
236  @Override
237  public int refCnt() {
238    return this.refCnt.refCnt();
239  }
240
241  @Override
242  public BucketEntry retain() {
243    refCnt.retain();
244    return this;
245  }
246
247  /**
248   * We've three cases to release refCnt now: <br>
249   * 1. BucketCache#evictBlock, it will release the backingMap's reference by force because we're
250   * closing file or clear the bucket cache or some corruption happen. when all rpc references gone,
251   * then free the area in bucketAllocator. <br>
252   * 2. BucketCache#returnBlock . when rpc shipped, we'll release the block, only when backingMap
253   * also release its refCnt (case.1 will do this) and no other rpc reference, then it will free the
254   * area in bucketAllocator. <br>
255   * 3.evict those block without any rpc reference if cache size exceeded. we'll only free those
256   * blocks with zero rpc reference count.
257   * @return true to indicate we've decreased to zero and do the de-allocation.
258   */
259  @Override
260  public boolean release() {
261    return refCnt.release();
262  }
263}