001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.util.AbstractQueue;
021import java.util.Collection;
022import java.util.Comparator;
023import java.util.Iterator;
024import java.util.Objects;
025import java.util.concurrent.BlockingQueue;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.locks.Condition;
028import java.util.concurrent.locks.ReentrantLock;
029import org.apache.yetus.audience.InterfaceAudience;
030import org.apache.yetus.audience.InterfaceStability;
031
032/**
033 * A generic bounded blocking Priority-Queue. The elements of the priority queue are ordered
034 * according to the Comparator provided at queue construction time. If multiple elements have the
035 * same priority this queue orders them in FIFO (first-in-first-out) manner. The head of this queue
036 * is the least element with respect to the specified ordering. If multiple elements are tied for
037 * least value, the head is the first one inserted. The queue retrieval operations poll, remove,
038 * peek, and element access the element at the head of the queue.
039 */
040@InterfaceAudience.Private
041@InterfaceStability.Stable
042public class BoundedPriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {
043  private static class PriorityQueue<E> {
044    private final Comparator<? super E> comparator;
045    private final E[] objects;
046
047    private int head = 0;
048    private int tail = 0;
049
050    @SuppressWarnings("unchecked")
051    public PriorityQueue(int capacity, Comparator<? super E> comparator) {
052      this.objects = (E[]) new Object[capacity];
053      this.comparator = comparator;
054    }
055
056    public void add(E elem) {
057      if (tail == objects.length) {
058        // shift down |-----AAAAAAA|
059        tail -= head;
060        System.arraycopy(objects, head, objects, 0, tail);
061        head = 0;
062      }
063
064      if (tail == head || comparator.compare(objects[tail - 1], elem) <= 0) {
065        // Append
066        objects[tail++] = elem;
067      } else if (head > 0 && comparator.compare(objects[head], elem) > 0) {
068        // Prepend
069        objects[--head] = elem;
070      } else {
071        // Insert in the middle
072        int index = upperBound(head, tail - 1, elem);
073        System.arraycopy(objects, index, objects, index + 1, tail - index);
074        objects[index] = elem;
075        tail++;
076      }
077    }
078
079    public E peek() {
080      return (head != tail) ? objects[head] : null;
081    }
082
083    public E poll() {
084      E elem = objects[head];
085      objects[head] = null;
086      head = (head + 1) % objects.length;
087      if (head == 0) tail = 0;
088      return elem;
089    }
090
091    public int size() {
092      return tail - head;
093    }
094
095    public Comparator<? super E> comparator() {
096      return this.comparator;
097    }
098
099    public boolean contains(Object o) {
100      for (int i = head; i < tail; ++i) {
101        if (objects[i] == o) {
102          return true;
103        }
104      }
105      return false;
106    }
107
108    public int remainingCapacity() {
109      return this.objects.length - (tail - head);
110    }
111
112    private int upperBound(int start, int end, E key) {
113      while (start < end) {
114        int mid = start + ((end - start) >> 1);
115        E mitem = objects[mid];
116        int cmp = comparator.compare(mitem, key);
117        if (cmp > 0) {
118          end = mid;
119        } else {
120          start = mid + 1;
121        }
122      }
123      return start;
124    }
125  }
126
127  // Lock used for all operations
128  private final ReentrantLock lock = new ReentrantLock();
129
130  // Condition for blocking when empty
131  private final Condition notEmpty = lock.newCondition();
132
133  // Wait queue for waiting puts
134  private final Condition notFull = lock.newCondition();
135
136  private final PriorityQueue<E> queue;
137
138  /**
139   * Creates a PriorityQueue with the specified capacity that orders its elements according to the
140   * specified comparator.
141   * @param capacity   the capacity of this queue
142   * @param comparator the comparator that will be used to order this priority queue
143   */
144  public BoundedPriorityBlockingQueue(int capacity, Comparator<? super E> comparator) {
145    this.queue = new PriorityQueue<>(capacity, comparator);
146  }
147
148  @Override
149  public boolean offer(E e) {
150    Objects.requireNonNull(e);
151
152    lock.lock();
153    try {
154      if (queue.remainingCapacity() > 0) {
155        this.queue.add(e);
156        notEmpty.signal();
157        return true;
158      }
159    } finally {
160      lock.unlock();
161    }
162    return false;
163  }
164
165  @Override
166  public void put(E e) throws InterruptedException {
167    Objects.requireNonNull(e);
168
169    lock.lock();
170    try {
171      while (queue.remainingCapacity() == 0) {
172        notFull.await();
173      }
174      this.queue.add(e);
175      notEmpty.signal();
176    } finally {
177      lock.unlock();
178    }
179  }
180
181  @Override
182  public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
183    Objects.requireNonNull(e);
184    long nanos = unit.toNanos(timeout);
185
186    lock.lockInterruptibly();
187    try {
188      while (queue.remainingCapacity() == 0) {
189        if (nanos <= 0) return false;
190        nanos = notFull.awaitNanos(nanos);
191      }
192      this.queue.add(e);
193      notEmpty.signal();
194    } finally {
195      lock.unlock();
196    }
197    return true;
198  }
199
200  @Override
201  public E take() throws InterruptedException {
202    E result = null;
203    lock.lockInterruptibly();
204    try {
205      while (queue.size() == 0) {
206        notEmpty.await();
207      }
208      result = queue.poll();
209      notFull.signal();
210    } finally {
211      lock.unlock();
212    }
213    return result;
214  }
215
216  @Override
217  public E poll() {
218    E result = null;
219    lock.lock();
220    try {
221      if (queue.size() > 0) {
222        result = queue.poll();
223        notFull.signal();
224      }
225    } finally {
226      lock.unlock();
227    }
228    return result;
229  }
230
231  @Override
232  public E poll(long timeout, TimeUnit unit) throws InterruptedException {
233    long nanos = unit.toNanos(timeout);
234    lock.lockInterruptibly();
235    E result = null;
236    try {
237      while (queue.size() == 0 && nanos > 0) {
238        nanos = notEmpty.awaitNanos(nanos);
239      }
240      if (queue.size() > 0) {
241        result = queue.poll();
242      }
243      notFull.signal();
244    } finally {
245      lock.unlock();
246    }
247    return result;
248  }
249
250  @Override
251  public E peek() {
252    lock.lock();
253    try {
254      return queue.peek();
255    } finally {
256      lock.unlock();
257    }
258  }
259
260  @Override
261  public int size() {
262    lock.lock();
263    try {
264      return queue.size();
265    } finally {
266      lock.unlock();
267    }
268  }
269
270  @Override
271  public Iterator<E> iterator() {
272    throw new UnsupportedOperationException();
273  }
274
275  public Comparator<? super E> comparator() {
276    return queue.comparator();
277  }
278
279  @Override
280  public int remainingCapacity() {
281    lock.lock();
282    try {
283      return queue.remainingCapacity();
284    } finally {
285      lock.unlock();
286    }
287  }
288
289  @Override
290  public boolean remove(Object o) {
291    throw new UnsupportedOperationException();
292  }
293
294  @Override
295  public boolean contains(Object o) {
296    lock.lock();
297    try {
298      return queue.contains(o);
299    } finally {
300      lock.unlock();
301    }
302  }
303
304  @Override
305  public int drainTo(Collection<? super E> c) {
306    return drainTo(c, Integer.MAX_VALUE);
307  }
308
309  @Override
310  public int drainTo(Collection<? super E> c, int maxElements) {
311    Objects.requireNonNull(c);
312    if (c == this) throw new IllegalArgumentException();
313    if (maxElements <= 0) return 0;
314    lock.lock();
315    try {
316      int n = Math.min(queue.size(), maxElements);
317      for (int i = 0; i < n; ++i) {
318        c.add(queue.poll());
319      }
320      return n;
321    } finally {
322      lock.unlock();
323    }
324  }
325}