1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.util;
20
21 import java.util.concurrent.locks.Condition;
22 import java.util.concurrent.locks.ReentrantLock;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.TimeUnit;
25 import java.util.Collection;
26 import java.util.Comparator;
27 import java.util.Iterator;
28 import java.util.AbstractQueue;
29
30 import org.apache.hadoop.hbase.classification.InterfaceAudience;
31 import org.apache.hadoop.hbase.classification.InterfaceStability;
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48 @InterfaceAudience.Private
49 @InterfaceStability.Stable
50 public class BoundedPriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {
51 private static class PriorityQueue<E> {
52 private final Comparator<? super E> comparator;
53 private final E[] objects;
54
55 private int head = 0;
56 private int tail = 0;
57
58 @SuppressWarnings("unchecked")
59 public PriorityQueue(int capacity, Comparator<? super E> comparator) {
60 this.objects = (E[])new Object[capacity];
61 this.comparator = comparator;
62 }
63
64 public void add(E elem) {
65 if (tail == objects.length) {
66
67 tail -= head;
68 System.arraycopy(objects, head, objects, 0, tail);
69 head = 0;
70 }
71
72 if (tail == head || comparator.compare(objects[tail - 1], elem) <= 0) {
73
74 objects[tail++] = elem;
75 } else if (head > 0 && comparator.compare(objects[head], elem) > 0) {
76
77 objects[--head] = elem;
78 } else {
79
80 int index = upperBound(head, tail - 1, elem);
81 System.arraycopy(objects, index, objects, index + 1, tail - index);
82 objects[index] = elem;
83 tail++;
84 }
85 }
86
87 public E peek() {
88 return (head != tail) ? objects[head] : null;
89 }
90
91 public E poll() {
92 E elem = objects[head];
93 objects[head] = null;
94 head = (head + 1) % objects.length;
95 if (head == 0) tail = 0;
96 return elem;
97 }
98
99 public int size() {
100 return tail - head;
101 }
102
103 public Comparator<? super E> comparator() {
104 return this.comparator;
105 }
106
107 public boolean contains(Object o) {
108 for (int i = head; i < tail; ++i) {
109 if (objects[i] == o) {
110 return true;
111 }
112 }
113 return false;
114 }
115
116 public int remainingCapacity() {
117 return this.objects.length - (tail - head);
118 }
119
120 private int upperBound(int start, int end, E key) {
121 while (start < end) {
122 int mid = (start + end) >>> 1;
123 E mitem = objects[mid];
124 int cmp = comparator.compare(mitem, key);
125 if (cmp > 0) {
126 end = mid;
127 } else {
128 start = mid + 1;
129 }
130 }
131 return start;
132 }
133 }
134
135
136
137 private final ReentrantLock lock = new ReentrantLock();
138
139
140 private final Condition notEmpty = lock.newCondition();
141
142
143 private final Condition notFull = lock.newCondition();
144
145 private final PriorityQueue<E> queue;
146
147
148
149
150
151
152
153 public BoundedPriorityBlockingQueue(int capacity,
154 Comparator<? super E> comparator) {
155 this.queue = new PriorityQueue<E>(capacity, comparator);
156 }
157
158 public boolean offer(E e) {
159 if (e == null) throw new NullPointerException();
160
161 lock.lock();
162 try {
163 if (queue.remainingCapacity() > 0) {
164 this.queue.add(e);
165 notEmpty.signal();
166 return true;
167 }
168 } finally {
169 lock.unlock();
170 }
171 return false;
172 }
173
174 public void put(E e) throws InterruptedException {
175 if (e == null) throw new NullPointerException();
176
177 lock.lock();
178 try {
179 while (queue.remainingCapacity() == 0) {
180 notFull.await();
181 }
182 this.queue.add(e);
183 notEmpty.signal();
184 } finally {
185 lock.unlock();
186 }
187 }
188
189 public boolean offer(E e, long timeout, TimeUnit unit)
190 throws InterruptedException {
191 if (e == null) throw new NullPointerException();
192 long nanos = unit.toNanos(timeout);
193
194 lock.lockInterruptibly();
195 try {
196 while (queue.remainingCapacity() == 0) {
197 if (nanos <= 0)
198 return false;
199 nanos = notFull.awaitNanos(nanos);
200 }
201 this.queue.add(e);
202 notEmpty.signal();
203 } finally {
204 lock.unlock();
205 }
206 return true;
207 }
208
209 public E take() throws InterruptedException {
210 E result = null;
211 lock.lockInterruptibly();
212 try {
213 while (queue.size() == 0) {
214 notEmpty.await();
215 }
216 result = queue.poll();
217 notFull.signal();
218 } finally {
219 lock.unlock();
220 }
221 return result;
222 }
223
224 public E poll() {
225 E result = null;
226 lock.lock();
227 try {
228 if (queue.size() > 0) {
229 result = queue.poll();
230 notFull.signal();
231 }
232 } finally {
233 lock.unlock();
234 }
235 return result;
236 }
237
238 public E poll(long timeout, TimeUnit unit)
239 throws InterruptedException {
240 long nanos = unit.toNanos(timeout);
241 lock.lockInterruptibly();
242 E result = null;
243 try {
244 while (queue.size() == 0 && nanos > 0) {
245 nanos = notEmpty.awaitNanos(nanos);
246 }
247 if (queue.size() > 0) {
248 result = queue.poll();
249 }
250 notFull.signal();
251 } finally {
252 lock.unlock();
253 }
254 return result;
255 }
256
257 public E peek() {
258 lock.lock();
259 try {
260 return queue.peek();
261 } finally {
262 lock.unlock();
263 }
264 }
265
266 public int size() {
267 lock.lock();
268 try {
269 return queue.size();
270 } finally {
271 lock.unlock();
272 }
273 }
274
275 public Iterator<E> iterator() {
276 throw new UnsupportedOperationException();
277 }
278
279 public Comparator<? super E> comparator() {
280 return queue.comparator();
281 }
282
283 public int remainingCapacity() {
284 lock.lock();
285 try {
286 return queue.remainingCapacity();
287 } finally {
288 lock.unlock();
289 }
290 }
291
292 public boolean remove(Object o) {
293 throw new UnsupportedOperationException();
294 }
295
296 public boolean contains(Object o) {
297 lock.lock();
298 try {
299 return queue.contains(o);
300 } finally {
301 lock.unlock();
302 }
303 }
304
305 public int drainTo(Collection<? super E> c) {
306 return drainTo(c, Integer.MAX_VALUE);
307 }
308
309 public int drainTo(Collection<? super E> c, int maxElements) {
310 if (c == null)
311 throw new NullPointerException();
312 if (c == this)
313 throw new IllegalArgumentException();
314 if (maxElements <= 0)
315 return 0;
316 lock.lock();
317 try {
318 int n = Math.min(queue.size(), maxElements);
319 for (int i = 0; i < n; ++i) {
320 c.add(queue.poll());
321 }
322 return n;
323 } finally {
324 lock.unlock();
325 }
326 }
327 }