001/**
002 * Copyright The Apache Software Foundation
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one or more
005 * contributor license agreements. See the NOTICE file distributed with this
006 * work for additional information regarding copyright ownership. The ASF
007 * licenses this file to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
015 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
016 * License for the specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.hadoop.hbase.io.hfile.bucket;
020
021
022import java.util.Comparator;
023import java.util.Map;
024
025import org.apache.yetus.audience.InterfaceAudience;
026import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
027
028import org.apache.hbase.thirdparty.com.google.common.collect.MinMaxPriorityQueue;
029
030/**
031 * A memory-bound queue that will grow until an element brings total size larger
032 * than maxSize. From then on, only entries that are sorted larger than the
033 * smallest current entry will be inserted/replaced.
034 * 
035 * <p>
036 * Use this when you want to find the largest elements (according to their
037 * ordering, not their heap size) that consume as close to the specified maxSize
038 * as possible. Default behavior is to grow just above rather than just below
039 * specified max.
040 */
041@InterfaceAudience.Private
042public class CachedEntryQueue {
043
044  private static final Comparator<Map.Entry<BlockCacheKey, BucketEntry>> COMPARATOR =
045    (a, b) -> BucketEntry.COMPARATOR.compare(a.getValue(), b.getValue());
046
047  private MinMaxPriorityQueue<Map.Entry<BlockCacheKey, BucketEntry>> queue;
048
049  private long cacheSize;
050  private long maxSize;
051
052  /**
053   * @param maxSize the target size of elements in the queue
054   * @param blockSize expected average size of blocks
055   */
056  public CachedEntryQueue(long maxSize, long blockSize) {
057    int initialSize = (int) (maxSize / blockSize);
058    if (initialSize == 0) {
059      initialSize++;
060    }
061    queue = MinMaxPriorityQueue.orderedBy(COMPARATOR).expectedSize(initialSize).create();
062    cacheSize = 0;
063    this.maxSize = maxSize;
064  }
065
066  /**
067   * Attempt to add the specified entry to this queue.
068   * <p>
069   * If the queue is smaller than the max size, or if the specified element is
070   * ordered after the smallest element in the queue, the element will be added
071   * to the queue. Otherwise, there is no side effect of this call.
072   * @param entry a bucket entry with key to try to add to the queue
073   */
074  public void add(Map.Entry<BlockCacheKey, BucketEntry> entry) {
075    if (cacheSize < maxSize) {
076      queue.add(entry);
077      cacheSize += entry.getValue().getLength();
078    } else {
079      BucketEntry head = queue.peek().getValue();
080      if (BucketEntry.COMPARATOR.compare(entry.getValue(), head) > 0) {
081        cacheSize += entry.getValue().getLength();
082        cacheSize -= head.getLength();
083        if (cacheSize > maxSize) {
084          queue.poll();
085        } else {
086          cacheSize += head.getLength();
087        }
088        queue.add(entry);
089      }
090    }
091  }
092
093  /**
094   * @return The next element in this queue, or {@code null} if the queue is
095   *         empty.
096   */
097  public Map.Entry<BlockCacheKey, BucketEntry> poll() {
098    return queue.poll();
099  }
100
101  /**
102   * @return The last element in this queue, or {@code null} if the queue is
103   *         empty.
104   */
105  public Map.Entry<BlockCacheKey, BucketEntry> pollLast() {
106    return queue.pollLast();
107  }
108}