1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.procedure2.util;
20
21 import java.util.concurrent.locks.Condition;
22 import java.util.concurrent.locks.ReentrantLock;
23 import java.util.concurrent.TimeUnit;
24
25 import org.apache.hadoop.hbase.classification.InterfaceAudience;
26 import org.apache.hadoop.hbase.classification.InterfaceStability;
27
28 @InterfaceAudience.Private
29 @InterfaceStability.Evolving
30 public class TimeoutBlockingQueue<E> {
31 public static interface TimeoutRetriever<T> {
32 long getTimeout(T object);
33 TimeUnit getTimeUnit(T object);
34 }
35
36 private final ReentrantLock lock = new ReentrantLock();
37 private final Condition waitCond = lock.newCondition();
38 private final TimeoutRetriever<? super E> timeoutRetriever;
39
40 private E[] objects;
41 private int head = 0;
42 private int tail = 0;
43
44 public TimeoutBlockingQueue(TimeoutRetriever<? super E> timeoutRetriever) {
45 this(32, timeoutRetriever);
46 }
47
48 @SuppressWarnings("unchecked")
49 public TimeoutBlockingQueue(int capacity, TimeoutRetriever<? super E> timeoutRetriever) {
50 this.objects = (E[])new Object[capacity];
51 this.timeoutRetriever = timeoutRetriever;
52 }
53
54 public void dump() {
55 for (int i = 0; i < objects.length; ++i) {
56 if (i == head) {
57 System.out.print("[" + objects[i] + "] ");
58 } else if (i == tail) {
59 System.out.print("]" + objects[i] + "[ ");
60 } else {
61 System.out.print(objects[i] + " ");
62 }
63 }
64 System.out.println();
65 }
66
67 public void clear() {
68 lock.lock();
69 try {
70 if (head != tail) {
71 for (int i = head; i < tail; ++i) {
72 objects[i] = null;
73 }
74 head = 0;
75 tail = 0;
76 waitCond.signal();
77 }
78 } finally {
79 lock.unlock();
80 }
81 }
82
83 public void add(E e) {
84 if (e == null) throw new NullPointerException();
85
86 lock.lock();
87 try {
88 addElement(e);
89 waitCond.signal();
90 } finally {
91 lock.unlock();
92 }
93 }
94
95 @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
96 public E poll() {
97 lock.lock();
98 try {
99 if (isEmpty()) {
100 waitCond.await();
101 return null;
102 }
103
104 E elem = objects[head];
105 long nanos = getNanosTimeout(elem);
106 nanos = waitCond.awaitNanos(nanos);
107 return nanos > 0 ? null : removeFirst();
108 } catch (InterruptedException e) {
109 Thread.currentThread().interrupt();
110 return null;
111 } finally {
112 lock.unlock();
113 }
114 }
115
116 public int size() {
117 return tail - head;
118 }
119
120 public boolean isEmpty() {
121 return (tail - head) == 0;
122 }
123
124 public void signalAll() {
125 lock.lock();
126 try {
127 waitCond.signalAll();
128 } finally {
129 lock.unlock();
130 }
131 }
132
133 private void addElement(E elem) {
134 int size = (tail - head);
135 if ((objects.length - size) == 0) {
136 int capacity = size + ((size < 64) ? (size + 2) : (size >> 1));
137 E[] newObjects = (E[])new Object[capacity];
138
139 if (compareTimeouts(objects[tail - 1], elem) <= 0) {
140
141 System.arraycopy(objects, head, newObjects, 0, tail);
142 tail -= head;
143 newObjects[tail++] = elem;
144 } else if (compareTimeouts(objects[head], elem) > 0) {
145
146 System.arraycopy(objects, head, newObjects, 1, tail);
147 newObjects[0] = elem;
148 tail -= (head - 1);
149 } else {
150
151 int index = upperBound(head, tail - 1, elem);
152 int newIndex = (index - head);
153 System.arraycopy(objects, head, newObjects, 0, newIndex);
154 newObjects[newIndex] = elem;
155 System.arraycopy(objects, index, newObjects, newIndex + 1, tail - index);
156 tail -= (head - 1);
157 }
158 head = 0;
159 objects = newObjects;
160 } else {
161 if (tail == objects.length) {
162
163 tail -= head;
164 System.arraycopy(objects, head, objects, 0, tail);
165 head = 0;
166 }
167
168 if (tail == head || compareTimeouts(objects[tail - 1], elem) <= 0) {
169
170 objects[tail++] = elem;
171 } else if (head > 0 && compareTimeouts(objects[head], elem) > 0) {
172
173 objects[--head] = elem;
174 } else {
175
176 int index = upperBound(head, tail - 1, elem);
177 System.arraycopy(objects, index, objects, index + 1, tail - index);
178 objects[index] = elem;
179 tail++;
180 }
181 }
182 }
183
184 private E removeFirst() {
185 E elem = objects[head];
186 objects[head] = null;
187 head = (head + 1) % objects.length;
188 if (head == 0) tail = 0;
189 return elem;
190 }
191
192 private int upperBound(int start, int end, E key) {
193 while (start < end) {
194 int mid = (start + end) >>> 1;
195 E mitem = objects[mid];
196 int cmp = compareTimeouts(mitem, key);
197 if (cmp > 0) {
198 end = mid;
199 } else {
200 start = mid + 1;
201 }
202 }
203 return start;
204 }
205
206 private int compareTimeouts(final E a, final E b) {
207 long t1 = getNanosTimeout(a);
208 long t2 = getNanosTimeout(b);
209 return (t1 < t2) ? -1 : (t1 > t2) ? 1 : 0;
210 }
211
212 private long getNanosTimeout(final E obj) {
213 TimeUnit unit = timeoutRetriever.getTimeUnit(obj);
214 long timeout = timeoutRetriever.getTimeout(obj);
215 return unit.toNanos(timeout);
216 }
217 }