001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.util;
020
021import java.util.concurrent.locks.Condition;
022import java.util.concurrent.locks.ReentrantLock;
023import java.util.concurrent.BlockingQueue;
024import java.util.concurrent.TimeUnit;
025import java.util.Collection;
026import java.util.Comparator;
027import java.util.Iterator;
028import java.util.AbstractQueue;
029
030import org.apache.yetus.audience.InterfaceAudience;
031import org.apache.yetus.audience.InterfaceStability;
032
033
034/**
035 * A generic bounded blocking Priority-Queue.
036 *
037 * The elements of the priority queue are ordered according to the Comparator
038 * provided at queue construction time.
039 *
040 * If multiple elements have the same priority this queue orders them in
041 * FIFO (first-in-first-out) manner.
042 * The head of this queue is the least element with respect to the specified
043 * ordering. If multiple elements are tied for least value, the head is the
044 * first one inserted.
045 * The queue retrieval operations poll, remove, peek, and element access the
046 * element at the head of the queue.
047 */
048@InterfaceAudience.Private
049@InterfaceStability.Stable
050public class BoundedPriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {
051  private static class PriorityQueue<E> {
052    private final Comparator<? super E> comparator;
053    private final E[] objects;
054
055    private int head = 0;
056    private int tail = 0;
057
058    @SuppressWarnings("unchecked")
059    public PriorityQueue(int capacity, Comparator<? super E> comparator) {
060      this.objects = (E[])new Object[capacity];
061      this.comparator = comparator;
062    }
063
064    public void add(E elem) {
065      if (tail == objects.length) {
066        // shift down |-----AAAAAAA|
067        tail -= head;
068        System.arraycopy(objects, head, objects, 0, tail);
069        head = 0;
070      }
071
072      if (tail == head || comparator.compare(objects[tail - 1], elem) <= 0) {
073        // Append
074        objects[tail++] = elem;
075      } else if (head > 0 && comparator.compare(objects[head], elem) > 0) {
076        // Prepend
077        objects[--head] = elem;
078      } else {
079        // Insert in the middle
080        int index = upperBound(head, tail - 1, elem);
081        System.arraycopy(objects, index, objects, index + 1, tail - index);
082        objects[index] = elem;
083        tail++;
084      }
085    }
086
087    public E peek() {
088      return (head != tail) ? objects[head] : null;
089    }
090
091    public E poll() {
092      E elem = objects[head];
093      objects[head] = null;
094      head = (head + 1) % objects.length;
095      if (head == 0) tail = 0;
096      return elem;
097    }
098
099    public int size() {
100      return tail - head;
101    }
102
103    public Comparator<? super E> comparator() {
104      return this.comparator;
105    }
106
107    public boolean contains(Object o) {
108      for (int i = head; i < tail; ++i) {
109        if (objects[i] == o) {
110          return true;
111        }
112      }
113      return false;
114    }
115
116    public int remainingCapacity() {
117      return this.objects.length - (tail - head);
118    }
119
120    private int upperBound(int start, int end, E key) {
121      while (start < end) {
122        int mid = start + ((end - start) >> 1);
123        E mitem = objects[mid];
124        int cmp = comparator.compare(mitem, key);
125        if (cmp > 0) {
126          end = mid;
127        } else {
128          start = mid + 1;
129        }
130      }
131      return start;
132    }
133  }
134
135
136  // Lock used for all operations
137  private final ReentrantLock lock = new ReentrantLock();
138
139  // Condition for blocking when empty
140  private final Condition notEmpty = lock.newCondition();
141
142  // Wait queue for waiting puts
143  private final Condition notFull = lock.newCondition();
144
145  private final PriorityQueue<E> queue;
146
147  /**
148   * Creates a PriorityQueue with the specified capacity that orders its
149   * elements according to the specified comparator.
150   * @param capacity the capacity of this queue
151   * @param comparator the comparator that will be used to order this priority queue
152   */
153  public BoundedPriorityBlockingQueue(int capacity,
154      Comparator<? super E> comparator) {
155    this.queue = new PriorityQueue<>(capacity, comparator);
156  }
157
158  @Override
159  public boolean offer(E e) {
160    if (e == null) throw new NullPointerException();
161
162    lock.lock();
163    try {
164      if (queue.remainingCapacity() > 0) {
165        this.queue.add(e);
166        notEmpty.signal();
167        return true;
168      }
169    } finally {
170      lock.unlock();
171    }
172    return false;
173  }
174
175  @Override
176  public void put(E e) throws InterruptedException {
177    if (e == null) throw new NullPointerException();
178
179    lock.lock();
180    try {
181      while (queue.remainingCapacity() == 0) {
182        notFull.await();
183      }
184      this.queue.add(e);
185      notEmpty.signal();
186    } finally {
187      lock.unlock();
188    }
189  }
190
191  @Override
192  public boolean offer(E e, long timeout, TimeUnit unit)
193      throws InterruptedException {
194    if (e == null) throw new NullPointerException();
195    long nanos = unit.toNanos(timeout);
196
197    lock.lockInterruptibly();
198    try {
199      while (queue.remainingCapacity() == 0) {
200        if (nanos <= 0)
201          return false;
202        nanos = notFull.awaitNanos(nanos);
203      }
204      this.queue.add(e);
205      notEmpty.signal();
206    } finally {
207      lock.unlock();
208    }
209    return true;
210  }
211
212  @Override
213  public E take() throws InterruptedException {
214    E result = null;
215    lock.lockInterruptibly();
216    try {
217      while (queue.size() == 0) {
218        notEmpty.await();
219      }
220      result = queue.poll();
221      notFull.signal();
222    } finally {
223      lock.unlock();
224    }
225    return result;
226  }
227
228  @Override
229  public E poll() {
230    E result = null;
231    lock.lock();
232    try {
233      if (queue.size() > 0) {
234        result = queue.poll();
235        notFull.signal();
236      }
237    } finally {
238      lock.unlock();
239    }
240    return result;
241  }
242
243  @Override
244  public E poll(long timeout, TimeUnit unit)
245      throws InterruptedException {
246    long nanos = unit.toNanos(timeout);
247    lock.lockInterruptibly();
248    E result = null;
249    try {
250      while (queue.size() == 0 && nanos > 0) {
251        nanos = notEmpty.awaitNanos(nanos);
252      }
253      if (queue.size() > 0) {
254        result = queue.poll();
255      }
256      notFull.signal();
257    } finally {
258      lock.unlock();
259    }
260    return result;
261  }
262
263  @Override
264  public E peek() {
265    lock.lock();
266    try {
267      return queue.peek();
268    } finally {
269      lock.unlock();
270    }
271  }
272
273  @Override
274  public int size() {
275    lock.lock();
276    try {
277      return queue.size();
278    } finally {
279      lock.unlock();
280    }
281  }
282
283  @Override
284  public Iterator<E> iterator() {
285    throw new UnsupportedOperationException();
286  }
287
288  public Comparator<? super E> comparator() {
289    return queue.comparator();
290  }
291
292  @Override
293  public int remainingCapacity() {
294    lock.lock();
295    try {
296      return queue.remainingCapacity();
297    } finally {
298      lock.unlock();
299    }
300  }
301
302  @Override
303  public boolean remove(Object o) {
304    throw new UnsupportedOperationException();
305  }
306
307  @Override
308  public boolean contains(Object o) {
309    lock.lock();
310    try {
311      return queue.contains(o);
312    } finally {
313      lock.unlock();
314    }
315  }
316
317  @Override
318  public int drainTo(Collection<? super E> c) {
319    return drainTo(c, Integer.MAX_VALUE);
320  }
321
322  @Override
323  public int drainTo(Collection<? super E> c, int maxElements) {
324    if (c == null)
325        throw new NullPointerException();
326    if (c == this)
327        throw new IllegalArgumentException();
328    if (maxElements <= 0)
329        return 0;
330    lock.lock();
331    try {
332      int n = Math.min(queue.size(), maxElements);
333      for (int i = 0; i < n; ++i) {
334        c.add(queue.poll());
335      }
336      return n;
337    } finally {
338      lock.unlock();
339    }
340  }
341}