001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.util.AbstractQueue;
021import java.util.Collection;
022import java.util.Comparator;
023import java.util.Iterator;
024import java.util.concurrent.BlockingQueue;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.locks.Condition;
027import java.util.concurrent.locks.ReentrantLock;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.apache.yetus.audience.InterfaceStability;
030
031/**
032 * A generic bounded blocking Priority-Queue. The elements of the priority queue are ordered
033 * according to the Comparator provided at queue construction time. If multiple elements have the
034 * same priority this queue orders them in FIFO (first-in-first-out) manner. The head of this queue
035 * is the least element with respect to the specified ordering. If multiple elements are tied for
036 * least value, the head is the first one inserted. The queue retrieval operations poll, remove,
037 * peek, and element access the element at the head of the queue.
038 */
039@InterfaceAudience.Private
040@InterfaceStability.Stable
041public class BoundedPriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {
042  private static class PriorityQueue<E> {
043    private final Comparator<? super E> comparator;
044    private final E[] objects;
045
046    private int head = 0;
047    private int tail = 0;
048
049    @SuppressWarnings("unchecked")
050    public PriorityQueue(int capacity, Comparator<? super E> comparator) {
051      this.objects = (E[]) new Object[capacity];
052      this.comparator = comparator;
053    }
054
055    public void add(E elem) {
056      if (tail == objects.length) {
057        // shift down |-----AAAAAAA|
058        tail -= head;
059        System.arraycopy(objects, head, objects, 0, tail);
060        head = 0;
061      }
062
063      if (tail == head || comparator.compare(objects[tail - 1], elem) <= 0) {
064        // Append
065        objects[tail++] = elem;
066      } else if (head > 0 && comparator.compare(objects[head], elem) > 0) {
067        // Prepend
068        objects[--head] = elem;
069      } else {
070        // Insert in the middle
071        int index = upperBound(head, tail - 1, elem);
072        System.arraycopy(objects, index, objects, index + 1, tail - index);
073        objects[index] = elem;
074        tail++;
075      }
076    }
077
078    public E peek() {
079      return (head != tail) ? objects[head] : null;
080    }
081
082    public E poll() {
083      E elem = objects[head];
084      objects[head] = null;
085      head = (head + 1) % objects.length;
086      if (head == 0) tail = 0;
087      return elem;
088    }
089
090    public int size() {
091      return tail - head;
092    }
093
094    public Comparator<? super E> comparator() {
095      return this.comparator;
096    }
097
098    public boolean contains(Object o) {
099      for (int i = head; i < tail; ++i) {
100        if (objects[i] == o) {
101          return true;
102        }
103      }
104      return false;
105    }
106
107    public int remainingCapacity() {
108      return this.objects.length - (tail - head);
109    }
110
111    private int upperBound(int start, int end, E key) {
112      while (start < end) {
113        int mid = start + ((end - start) >> 1);
114        E mitem = objects[mid];
115        int cmp = comparator.compare(mitem, key);
116        if (cmp > 0) {
117          end = mid;
118        } else {
119          start = mid + 1;
120        }
121      }
122      return start;
123    }
124  }
125
126  // Lock used for all operations
127  private final ReentrantLock lock = new ReentrantLock();
128
129  // Condition for blocking when empty
130  private final Condition notEmpty = lock.newCondition();
131
132  // Wait queue for waiting puts
133  private final Condition notFull = lock.newCondition();
134
135  private final PriorityQueue<E> queue;
136
137  /**
138   * Creates a PriorityQueue with the specified capacity that orders its elements according to the
139   * specified comparator.
140   * @param capacity   the capacity of this queue
141   * @param comparator the comparator that will be used to order this priority queue
142   */
143  public BoundedPriorityBlockingQueue(int capacity, Comparator<? super E> comparator) {
144    this.queue = new PriorityQueue<>(capacity, comparator);
145  }
146
147  @Override
148  public boolean offer(E e) {
149    if (e == null) throw new NullPointerException();
150
151    lock.lock();
152    try {
153      if (queue.remainingCapacity() > 0) {
154        this.queue.add(e);
155        notEmpty.signal();
156        return true;
157      }
158    } finally {
159      lock.unlock();
160    }
161    return false;
162  }
163
164  @Override
165  public void put(E e) throws InterruptedException {
166    if (e == null) throw new NullPointerException();
167
168    lock.lock();
169    try {
170      while (queue.remainingCapacity() == 0) {
171        notFull.await();
172      }
173      this.queue.add(e);
174      notEmpty.signal();
175    } finally {
176      lock.unlock();
177    }
178  }
179
180  @Override
181  public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
182    if (e == null) throw new NullPointerException();
183    long nanos = unit.toNanos(timeout);
184
185    lock.lockInterruptibly();
186    try {
187      while (queue.remainingCapacity() == 0) {
188        if (nanos <= 0) return false;
189        nanos = notFull.awaitNanos(nanos);
190      }
191      this.queue.add(e);
192      notEmpty.signal();
193    } finally {
194      lock.unlock();
195    }
196    return true;
197  }
198
199  @Override
200  public E take() throws InterruptedException {
201    E result = null;
202    lock.lockInterruptibly();
203    try {
204      while (queue.size() == 0) {
205        notEmpty.await();
206      }
207      result = queue.poll();
208      notFull.signal();
209    } finally {
210      lock.unlock();
211    }
212    return result;
213  }
214
215  @Override
216  public E poll() {
217    E result = null;
218    lock.lock();
219    try {
220      if (queue.size() > 0) {
221        result = queue.poll();
222        notFull.signal();
223      }
224    } finally {
225      lock.unlock();
226    }
227    return result;
228  }
229
230  @Override
231  public E poll(long timeout, TimeUnit unit) throws InterruptedException {
232    long nanos = unit.toNanos(timeout);
233    lock.lockInterruptibly();
234    E result = null;
235    try {
236      while (queue.size() == 0 && nanos > 0) {
237        nanos = notEmpty.awaitNanos(nanos);
238      }
239      if (queue.size() > 0) {
240        result = queue.poll();
241      }
242      notFull.signal();
243    } finally {
244      lock.unlock();
245    }
246    return result;
247  }
248
249  @Override
250  public E peek() {
251    lock.lock();
252    try {
253      return queue.peek();
254    } finally {
255      lock.unlock();
256    }
257  }
258
259  @Override
260  public int size() {
261    lock.lock();
262    try {
263      return queue.size();
264    } finally {
265      lock.unlock();
266    }
267  }
268
269  @Override
270  public Iterator<E> iterator() {
271    throw new UnsupportedOperationException();
272  }
273
274  public Comparator<? super E> comparator() {
275    return queue.comparator();
276  }
277
278  @Override
279  public int remainingCapacity() {
280    lock.lock();
281    try {
282      return queue.remainingCapacity();
283    } finally {
284      lock.unlock();
285    }
286  }
287
288  @Override
289  public boolean remove(Object o) {
290    throw new UnsupportedOperationException();
291  }
292
293  @Override
294  public boolean contains(Object o) {
295    lock.lock();
296    try {
297      return queue.contains(o);
298    } finally {
299      lock.unlock();
300    }
301  }
302
303  @Override
304  public int drainTo(Collection<? super E> c) {
305    return drainTo(c, Integer.MAX_VALUE);
306  }
307
308  @Override
309  public int drainTo(Collection<? super E> c, int maxElements) {
310    if (c == null) throw new NullPointerException();
311    if (c == this) throw new IllegalArgumentException();
312    if (maxElements <= 0) return 0;
313    lock.lock();
314    try {
315      int n = Math.min(queue.size(), maxElements);
316      for (int i = 0; i < n; ++i) {
317        c.add(queue.poll());
318      }
319      return n;
320    } finally {
321      lock.unlock();
322    }
323  }
324}