001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import org.apache.hadoop.hbase.io.HeapSize;
021import org.apache.yetus.audience.InterfaceAudience;
022
023import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
024import org.apache.hbase.thirdparty.com.google.common.collect.MinMaxPriorityQueue;
025
026/**
027 * A memory-bound queue that will grow until an element brings total size >= maxSize. From then
028 * on, only entries that are sorted larger than the smallest current entry will be
029 * inserted/replaced.
030 * <p>
031 * Use this when you want to find the largest elements (according to their ordering, not their heap
032 * size) that consume as close to the specified maxSize as possible. Default behavior is to grow
033 * just above rather than just below specified max.
034 * <p>
035 * Object used in this queue must implement {@link HeapSize} as well as {@link Comparable}.
036 */
037@InterfaceAudience.Private
038public class LruCachedBlockQueue implements HeapSize {
039
040  private MinMaxPriorityQueue<LruCachedBlock> queue;
041
042  private long heapSize;
043  private long maxSize;
044
045  /**
046   * @param maxSize   the target size of elements in the queue
047   * @param blockSize expected average size of blocks
048   */
049  public LruCachedBlockQueue(long maxSize, long blockSize) {
050    Preconditions.checkArgument(blockSize > 0, "negative blockSize %s", blockSize);
051    Preconditions.checkArgument(maxSize > 0, "negative maxSize %s", maxSize);
052    int initialSize = (int) (maxSize / blockSize);
053    if (initialSize == 0) {
054      initialSize++;
055    }
056    queue = MinMaxPriorityQueue.expectedSize(initialSize).create();
057    heapSize = 0;
058    this.maxSize = maxSize;
059  }
060
061  /**
062   * Attempt to add the specified cached block to this queue.
063   * <p>
064   * If the queue is smaller than the max size, or if the specified element is ordered before the
065   * smallest element in the queue, the element will be added to the queue. Otherwise, there is no
066   * side effect of this call.
067   * @param cb block to try to add to the queue
068   */
069  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
070      value = "NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE",
071      justification = "head can not be null as heapSize is greater than maxSize,"
072        + " which means we have something in the queue")
073  public void add(LruCachedBlock cb) {
074    if (heapSize < maxSize) {
075      queue.add(cb);
076      heapSize += cb.heapSize();
077    } else {
078      LruCachedBlock head = queue.peek();
079      if (cb.compareTo(head) > 0) {
080        heapSize += cb.heapSize();
081        heapSize -= head.heapSize();
082        if (heapSize > maxSize) {
083          queue.poll();
084        } else {
085          heapSize += head.heapSize();
086        }
087        queue.add(cb);
088      }
089    }
090  }
091
092  /** Returns The next element in this queue, or {@code null} if the queue is empty. */
093  public LruCachedBlock poll() {
094    return queue.poll();
095  }
096
097  /** Returns The last element in this queue, or {@code null} if the queue is empty. */
098  public LruCachedBlock pollLast() {
099    return queue.pollLast();
100  }
101
102  /**
103   * Total size of all elements in this queue.
104   * @return size of all elements currently in queue, in bytes
105   */
106  @Override
107  public long heapSize() {
108    return heapSize;
109  }
110}