001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import java.util.Comparator;
021import java.util.Map;
022import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
023import org.apache.yetus.audience.InterfaceAudience;
024
025import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
026import org.apache.hbase.thirdparty.com.google.common.collect.MinMaxPriorityQueue;
027
028/**
029 * A memory-bound queue that will grow until an element brings total size larger than maxSize. From
030 * then on, only entries that are sorted larger than the smallest current entry will be
031 * inserted/replaced.
032 * <p>
033 * Use this when you want to find the largest elements (according to their ordering, not their heap
034 * size) that consume as close to the specified maxSize as possible. Default behavior is to grow
035 * just above rather than just below specified max.
036 */
037@InterfaceAudience.Private
038public class CachedEntryQueue {
039
040  private static final Comparator<Map.Entry<BlockCacheKey, BucketEntry>> COMPARATOR =
041    (a, b) -> BucketEntry.COMPARATOR.compare(a.getValue(), b.getValue());
042
043  private MinMaxPriorityQueue<Map.Entry<BlockCacheKey, BucketEntry>> queue;
044
045  private long cacheSize;
046  private long maxSize;
047
048  /**
049   * @param maxSize   the target size of elements in the queue
050   * @param blockSize expected average size of blocks
051   */
052  public CachedEntryQueue(long maxSize, long blockSize) {
053    Preconditions.checkArgument(blockSize > 0, "negative blockSize %s", blockSize);
054    Preconditions.checkArgument(maxSize > 0, "negative maxSize %s", maxSize);
055    int initialSize = (int) (maxSize / blockSize);
056    if (initialSize == 0) {
057      initialSize++;
058    }
059    queue = MinMaxPriorityQueue.orderedBy(COMPARATOR).expectedSize(initialSize).create();
060    cacheSize = 0;
061    this.maxSize = maxSize;
062  }
063
064  /**
065   * Attempt to add the specified entry to this queue.
066   * <p>
067   * If the queue is smaller than the max size, or if the specified element is ordered after the
068   * smallest element in the queue, the element will be added to the queue. Otherwise, there is no
069   * side effect of this call.
070   * @param entry a bucket entry with key to try to add to the queue
071   */
072  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
073      value = "NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE",
074      justification = "head can not be null as cacheSize is greater than maxSize,"
075        + " which means we have something in the queue")
076  public void add(Map.Entry<BlockCacheKey, BucketEntry> entry) {
077    if (cacheSize < maxSize) {
078      queue.add(entry);
079      cacheSize += entry.getValue().getLength();
080    } else {
081      BucketEntry head = queue.peek().getValue();
082      if (BucketEntry.COMPARATOR.compare(entry.getValue(), head) > 0) {
083        cacheSize += entry.getValue().getLength();
084        cacheSize -= head.getLength();
085        if (cacheSize > maxSize) {
086          queue.poll();
087        } else {
088          cacheSize += head.getLength();
089        }
090        queue.add(entry);
091      }
092    }
093  }
094
095  /** Returns The next element in this queue, or {@code null} if the queue is empty. */
096  public Map.Entry<BlockCacheKey, BucketEntry> poll() {
097    return queue.poll();
098  }
099
100  /** Returns The last element in this queue, or {@code null} if the queue is empty. */
101  public Map.Entry<BlockCacheKey, BucketEntry> pollLast() {
102    return queue.pollLast();
103  }
104}