View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.util;
20  
21  import java.util.concurrent.locks.Condition;
22  import java.util.concurrent.locks.ReentrantLock;
23  import java.util.concurrent.BlockingQueue;
24  import java.util.concurrent.TimeUnit;
25  import java.util.Collection;
26  import java.util.Comparator;
27  import java.util.Iterator;
28  import java.util.AbstractQueue;
29  
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.hbase.classification.InterfaceStability;
32  
33  
34  /**
35   * A generic bounded blocking Priority-Queue.
36   *
37   * The elements of the priority queue are ordered according to the Comparator
38   * provided at queue construction time.
39   *
40   * If multiple elements have the same priority this queue orders them in
41   * FIFO (first-in-first-out) manner.
42   * The head of this queue is the least element with respect to the specified
43   * ordering. If multiple elements are tied for least value, the head is the
44   * first one inserted.
45   * The queue retrieval operations poll, remove, peek, and element access the
46   * element at the head of the queue.
47   */
48  @InterfaceAudience.Private
49  @InterfaceStability.Stable
50  public class BoundedPriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {
51    private static class PriorityQueue<E> {
52      private final Comparator<? super E> comparator;
53      private final E[] objects;
54  
55      private int head = 0;
56      private int tail = 0;
57  
58      @SuppressWarnings("unchecked")
59      public PriorityQueue(int capacity, Comparator<? super E> comparator) {
60        this.objects = (E[])new Object[capacity];
61        this.comparator = comparator;
62      }
63  
64      public void add(E elem) {
65        if (tail == objects.length) {
66          // shift down |-----AAAAAAA|
67          tail -= head;
68          System.arraycopy(objects, head, objects, 0, tail);
69          head = 0;
70        }
71  
72        if (tail == head || comparator.compare(objects[tail - 1], elem) <= 0) {
73          // Append
74          objects[tail++] = elem;
75        } else if (head > 0 && comparator.compare(objects[head], elem) > 0) {
76          // Prepend
77          objects[--head] = elem;
78        } else {
79          // Insert in the middle
80          int index = upperBound(head, tail - 1, elem);
81          System.arraycopy(objects, index, objects, index + 1, tail - index);
82          objects[index] = elem;
83          tail++;
84        }
85      }
86  
87      public E peek() {
88        return (head != tail) ? objects[head] : null;
89      }
90  
91      public E poll() {
92        E elem = objects[head];
93        objects[head] = null;
94        head = (head + 1) % objects.length;
95        if (head == 0) tail = 0;
96        return elem;
97      }
98  
99      public int size() {
100       return tail - head;
101     }
102 
103     public Comparator<? super E> comparator() {
104       return this.comparator;
105     }
106 
107     public boolean contains(Object o) {
108       for (int i = head; i < tail; ++i) {
109         if (objects[i] == o) {
110           return true;
111         }
112       }
113       return false;
114     }
115 
116     public int remainingCapacity() {
117       return this.objects.length - (tail - head);
118     }
119 
120     private int upperBound(int start, int end, E key) {
121       while (start < end) {
122         int mid = (start + end) >>> 1;
123         E mitem = objects[mid];
124         int cmp = comparator.compare(mitem, key);
125         if (cmp > 0) {
126           end = mid;
127         } else {
128           start = mid + 1;
129         }
130       }
131       return start;
132     }
133   }
134 
135 
136   // Lock used for all operations
137   private final ReentrantLock lock = new ReentrantLock();
138 
139   // Condition for blocking when empty
140   private final Condition notEmpty = lock.newCondition();
141 
142   // Wait queue for waiting puts
143   private final Condition notFull = lock.newCondition();
144 
145   private final PriorityQueue<E> queue;
146 
147   /**
148    * Creates a PriorityQueue with the specified capacity that orders its
149    * elements according to the specified comparator.
150    * @param capacity the capacity of this queue
151    * @param comparator the comparator that will be used to order this priority queue
152    */
153   public BoundedPriorityBlockingQueue(int capacity,
154       Comparator<? super E> comparator) {
155     this.queue = new PriorityQueue<E>(capacity, comparator);
156   }
157 
158   public boolean offer(E e) {
159     if (e == null) throw new NullPointerException();
160 
161     lock.lock();
162     try {
163       if (queue.remainingCapacity() > 0) {
164         this.queue.add(e);
165         notEmpty.signal();
166         return true;
167       }
168     } finally {
169       lock.unlock();
170     }
171     return false;
172   }
173 
174   public void put(E e) throws InterruptedException {
175     if (e == null) throw new NullPointerException();
176 
177     lock.lock();
178     try {
179       while (queue.remainingCapacity() == 0) {
180         notFull.await();
181       }
182       this.queue.add(e);
183       notEmpty.signal();
184     } finally {
185       lock.unlock();
186     }
187   }
188 
189   public boolean offer(E e, long timeout, TimeUnit unit)
190       throws InterruptedException {
191     if (e == null) throw new NullPointerException();
192     long nanos = unit.toNanos(timeout);
193 
194     lock.lockInterruptibly();
195     try {
196       while (queue.remainingCapacity() == 0) {
197         if (nanos <= 0)
198           return false;
199         nanos = notFull.awaitNanos(nanos);
200       }
201       this.queue.add(e);
202       notEmpty.signal();
203     } finally {
204       lock.unlock();
205     }
206     return true;
207   }
208 
209   public E take() throws InterruptedException {
210     E result = null;
211     lock.lockInterruptibly();
212     try {
213       while (queue.size() == 0) {
214         notEmpty.await();
215       }
216       result = queue.poll();
217       notFull.signal();
218     } finally {
219       lock.unlock();
220     }
221     return result;
222   }
223 
224   public E poll() {
225     E result = null;
226     lock.lock();
227     try {
228       if (queue.size() > 0) {
229         result = queue.poll();
230         notFull.signal();
231       }
232     } finally {
233       lock.unlock();
234     }
235     return result;
236   }
237 
238   public E poll(long timeout, TimeUnit unit)
239       throws InterruptedException {
240     long nanos = unit.toNanos(timeout);
241     lock.lockInterruptibly();
242     E result = null;
243     try {
244       while (queue.size() == 0 && nanos > 0) {
245         nanos = notEmpty.awaitNanos(nanos);
246       }
247       if (queue.size() > 0) {
248         result = queue.poll();
249       }
250       notFull.signal();
251     } finally {
252       lock.unlock();
253     }
254     return result;
255   }
256 
257   public E peek() {
258     lock.lock();
259     try {
260       return queue.peek();
261     } finally {
262       lock.unlock();
263     }
264   }
265 
266   public int size() {
267     lock.lock();
268     try {
269       return queue.size();
270     } finally {
271       lock.unlock();
272     }
273   }
274 
275   public Iterator<E> iterator() {
276     throw new UnsupportedOperationException();
277   }
278 
279   public Comparator<? super E> comparator() {
280     return queue.comparator();
281   }
282 
283   public int remainingCapacity() {
284     lock.lock();
285     try {
286       return queue.remainingCapacity();
287     } finally {
288       lock.unlock();
289     }
290   }
291 
292   public boolean remove(Object o) {
293     throw new UnsupportedOperationException();
294   }
295 
296   public boolean contains(Object o) {
297     lock.lock();
298     try {
299       return queue.contains(o);
300     } finally {
301       lock.unlock();
302     }
303   }
304 
305   public int drainTo(Collection<? super E> c) {
306     return drainTo(c, Integer.MAX_VALUE);
307   }
308 
309   public int drainTo(Collection<? super E> c, int maxElements) {
310     if (c == null)
311         throw new NullPointerException();
312     if (c == this)
313         throw new IllegalArgumentException();
314     if (maxElements <= 0)
315         return 0;
316     lock.lock();
317     try {
318       int n = Math.min(queue.size(), maxElements);
319       for (int i = 0; i < n; ++i) {
320         c.add(queue.poll());
321       }
322       return n;
323     } finally {
324       lock.unlock();
325     }
326   }
327 }