1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.thrift;
20
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.concurrent.BlockingQueue;
26 import java.util.concurrent.TimeUnit;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.hbase.classification.InterfaceAudience;
31
32
33
34
35
36 @InterfaceAudience.Private
37 public class CallQueue implements BlockingQueue<Runnable> {
38 private static final Log LOG = LogFactory.getLog(CallQueue.class);
39
40 private final BlockingQueue<Call> underlyingQueue;
41 private final ThriftMetrics metrics;
42
43 public CallQueue(BlockingQueue<Call> underlyingQueue,
44 ThriftMetrics metrics) {
45 this.underlyingQueue = underlyingQueue;
46 this.metrics = metrics;
47 }
48
49 private static long now() {
50 return System.nanoTime();
51 }
52
53 public static class Call implements Runnable {
54 final long startTime;
55 final Runnable underlyingRunnable;
56
57 Call(Runnable underlyingRunnable) {
58 this.underlyingRunnable = underlyingRunnable;
59 this.startTime = now();
60 }
61
62 @Override
63 public void run() {
64 underlyingRunnable.run();
65 }
66
67 public long timeInQueue() {
68 return now() - startTime;
69 }
70
71 @Override
72 public boolean equals(Object other) {
73 if (other instanceof Call) {
74 Call otherCall = (Call)(other);
75 return this.underlyingRunnable.equals(otherCall.underlyingRunnable);
76 } else if (other instanceof Runnable) {
77 return this.underlyingRunnable.equals(other);
78 }
79 return false;
80 }
81
82 @Override
83 public int hashCode() {
84 return this.underlyingRunnable.hashCode();
85 }
86 }
87
88 @Override
89 public Runnable poll() {
90 Call result = underlyingQueue.poll();
91 updateMetrics(result);
92 return result;
93 }
94
95 private void updateMetrics(Call result) {
96 if (result == null) {
97 return;
98 }
99 metrics.incTimeInQueue(result.timeInQueue());
100 metrics.setCallQueueLen(this.size());
101 }
102
103 @Override
104 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
105 Call result = underlyingQueue.poll(timeout, unit);
106 updateMetrics(result);
107 return result;
108 }
109
110 @Override
111 public Runnable remove() {
112 Call result = underlyingQueue.remove();
113 updateMetrics(result);
114 return result;
115 }
116
117 @Override
118 public Runnable take() throws InterruptedException {
119 Call result = underlyingQueue.take();
120 updateMetrics(result);
121 return result;
122 }
123
124 @Override
125 public int drainTo(Collection<? super Runnable> destination) {
126 return drainTo(destination, Integer.MAX_VALUE);
127 }
128
129 @Override
130 public int drainTo(Collection<? super Runnable> destination,
131 int maxElements) {
132 if (destination == this) {
133 throw new IllegalArgumentException(
134 "A BlockingQueue cannot drain to itself.");
135 }
136 List<Call> drained = new ArrayList<Call>();
137 underlyingQueue.drainTo(drained, maxElements);
138 for (Call r : drained) {
139 updateMetrics(r);
140 }
141 destination.addAll(drained);
142 int sz = drained.size();
143 LOG.info("Elements drained: " + sz);
144 return sz;
145 }
146
147
148 @Override
149 public boolean offer(Runnable element) {
150 return underlyingQueue.offer(new Call(element));
151 }
152
153 @Override
154 public boolean offer(Runnable element, long timeout, TimeUnit unit)
155 throws InterruptedException {
156 return underlyingQueue.offer(new Call(element), timeout, unit);
157 }
158 @Override
159 public void put(Runnable element) throws InterruptedException {
160 underlyingQueue.put(new Call(element));
161 }
162
163 @Override
164 public boolean add(Runnable element) {
165 return underlyingQueue.add(new Call(element));
166 }
167
168 @Override
169 public boolean addAll(Collection<? extends Runnable> elements) {
170 int added = 0;
171 for (Runnable r : elements) {
172 added += underlyingQueue.add(new Call(r)) ? 1 : 0;
173 }
174 return added != 0;
175 }
176
177 @Override
178 public Runnable element() {
179 return underlyingQueue.element();
180 }
181
182 @Override
183 public Runnable peek() {
184 return underlyingQueue.peek();
185 }
186
187 @Override
188 public void clear() {
189 underlyingQueue.clear();
190 }
191
192 @Override
193 public boolean containsAll(Collection<?> elements) {
194 return underlyingQueue.containsAll(elements);
195 }
196
197 @Override
198 public boolean isEmpty() {
199 return underlyingQueue.isEmpty();
200 }
201
202 @Override
203 public Iterator<Runnable> iterator() {
204 return new Iterator<Runnable>() {
205 final Iterator<Call> underlyingIterator = underlyingQueue.iterator();
206 @Override
207 public Runnable next() {
208 return underlyingIterator.next();
209 }
210
211 @Override
212 public boolean hasNext() {
213 return underlyingIterator.hasNext();
214 }
215
216 @Override
217 public void remove() {
218 underlyingIterator.remove();
219 }
220 };
221 }
222
223 @Override
224 public boolean removeAll(Collection<?> elements) {
225 return underlyingQueue.removeAll(elements);
226 }
227
228 @Override
229 public boolean retainAll(Collection<?> elements) {
230 return underlyingQueue.retainAll(elements);
231 }
232
233 @Override
234 public int size() {
235 return underlyingQueue.size();
236 }
237
238 @Override
239 public Object[] toArray() {
240 return underlyingQueue.toArray();
241 }
242
243 @Override
244 public <T> T[] toArray(T[] array) {
245 return underlyingQueue.toArray(array);
246 }
247
248 @Override
249 public boolean contains(Object element) {
250 return underlyingQueue.contains(element);
251 }
252
253 @Override
254 public int remainingCapacity() {
255 return underlyingQueue.remainingCapacity();
256 }
257
258 @Override
259 public boolean remove(Object element) {
260 return underlyingQueue.remove(element);
261 }
262 }