001/**
002 * Copyright The Apache Software Foundation
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one or more
005 * contributor license agreements. See the NOTICE file distributed with this
006 * work for additional information regarding copyright ownership. The ASF
007 * licenses this file to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
015 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
016 * License for the specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.hadoop.hbase.io.hfile.bucket;
020
021
022import java.util.Comparator;
023import java.util.Map;
024import java.util.Map.Entry;
025
026import org.apache.yetus.audience.InterfaceAudience;
027import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
028import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
029
030import org.apache.hbase.thirdparty.com.google.common.collect.MinMaxPriorityQueue;
031
032/**
033 * A memory-bound queue that will grow until an element brings total size larger
034 * than maxSize. From then on, only entries that are sorted larger than the
035 * smallest current entry will be inserted/replaced.
036 * 
037 * <p>
038 * Use this when you want to find the largest elements (according to their
039 * ordering, not their heap size) that consume as close to the specified maxSize
040 * as possible. Default behavior is to grow just above rather than just below
041 * specified max.
042 */
043@InterfaceAudience.Private
044public class CachedEntryQueue {
045
046  private MinMaxPriorityQueue<Map.Entry<BlockCacheKey, BucketEntry>> queue;
047
048  private long cacheSize;
049  private long maxSize;
050
051  /**
052   * @param maxSize the target size of elements in the queue
053   * @param blockSize expected average size of blocks
054   */
055  public CachedEntryQueue(long maxSize, long blockSize) {
056    int initialSize = (int) (maxSize / blockSize);
057    if (initialSize == 0) {
058      initialSize++;
059    }
060    queue = MinMaxPriorityQueue.orderedBy(new Comparator<Map.Entry<BlockCacheKey, BucketEntry>>() {
061
062      @Override
063      public int compare(Entry<BlockCacheKey, BucketEntry> entry1,
064          Entry<BlockCacheKey, BucketEntry> entry2) {
065        return BucketEntry.COMPARATOR.compare(entry1.getValue(), entry2.getValue());
066      }
067
068    }).expectedSize(initialSize).create();
069    cacheSize = 0;
070    this.maxSize = maxSize;
071  }
072
073  /**
074   * Attempt to add the specified entry to this queue.
075   * <p>
076   * If the queue is smaller than the max size, or if the specified element is
077   * ordered after the smallest element in the queue, the element will be added
078   * to the queue. Otherwise, there is no side effect of this call.
079   * @param entry a bucket entry with key to try to add to the queue
080   */
081  public void add(Map.Entry<BlockCacheKey, BucketEntry> entry) {
082    if (cacheSize < maxSize) {
083      queue.add(entry);
084      cacheSize += entry.getValue().getLength();
085    } else {
086      BucketEntry head = queue.peek().getValue();
087      if (BucketEntry.COMPARATOR.compare(entry.getValue(), head) > 0) {
088        cacheSize += entry.getValue().getLength();
089        cacheSize -= head.getLength();
090        if (cacheSize > maxSize) {
091          queue.poll();
092        } else {
093          cacheSize += head.getLength();
094        }
095        queue.add(entry);
096      }
097    }
098  }
099
100  /**
101   * @return The next element in this queue, or {@code null} if the queue is
102   *         empty.
103   */
104  public Map.Entry<BlockCacheKey, BucketEntry> poll() {
105    return queue.poll();
106  }
107
108  /**
109   * @return The last element in this queue, or {@code null} if the queue is
110   *         empty.
111   */
112  public Map.Entry<BlockCacheKey, BucketEntry> pollLast() {
113    return queue.pollLast();
114  }
115
116  /**
117   * Total size of all elements in this queue.
118   * @return size of all elements currently in queue, in bytes
119   */
120  public long cacheSize() {
121    return cacheSize;
122  }
123}