View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.thrift;
20  
21  import java.util.ArrayList;
22  import java.util.Collection;
23  import java.util.Iterator;
24  import java.util.List;
25  import java.util.concurrent.BlockingQueue;
26  import java.util.concurrent.TimeUnit;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  
31  /**
32   * A BlockingQueue reports waiting time in queue and queue length to
33   * ThriftMetrics.
34   */
35  public class CallQueue implements BlockingQueue<Runnable> {
36    private static Log LOG = LogFactory.getLog(CallQueue.class);
37  
38    private final BlockingQueue<Call> underlyingQueue;
39    private final ThriftMetrics metrics;
40  
41    public CallQueue(BlockingQueue<Call> underlyingQueue,
42                     ThriftMetrics metrics) {
43      this.underlyingQueue = underlyingQueue;
44      this.metrics = metrics;
45    }
46  
47    private static long now() {
48      return System.nanoTime();
49    }
50  
51    public static class Call implements Runnable {
52      final long startTime;
53      final Runnable underlyingRunnable;
54  
55      Call(Runnable underlyingRunnable) {
56        this.underlyingRunnable = underlyingRunnable;
57        this.startTime = now();
58      }
59  
60      @Override
61      public void run() {
62        underlyingRunnable.run();
63      }
64  
65      public long timeInQueue() {
66        return now() - startTime;
67      }
68  
69      @Override
70      public boolean equals(Object other) {
71        if (other instanceof Call) {
72          Call otherCall = (Call)(other);
73          return this.underlyingRunnable.equals(otherCall.underlyingRunnable);
74        } else if (other instanceof Runnable) {
75          return this.underlyingRunnable.equals(other);
76        }
77        return false;
78      }
79  
80      @Override
81      public int hashCode() {
82        return this.underlyingRunnable.hashCode();
83      }
84    }
85  
86    @Override
87    public Runnable poll() {
88      Call result = underlyingQueue.poll();
89      updateMetrics(result);
90      return result;
91    }
92  
93    private void updateMetrics(Call result) {
94      if (result == null) {
95        return;
96      }
97      metrics.incTimeInQueue(result.timeInQueue());
98      metrics.setCallQueueLen(this.size());
99    }
100 
101   @Override
102   public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
103     Call result = underlyingQueue.poll(timeout, unit);
104     updateMetrics(result);
105     return result;
106   }
107 
108   @Override
109   public Runnable remove() {
110     Call result = underlyingQueue.remove();
111     updateMetrics(result);
112     return result;
113   }
114 
115   @Override
116   public Runnable take() throws InterruptedException {
117     Call result = underlyingQueue.take();
118     updateMetrics(result);
119     return result;
120   }
121 
122   @Override
123   public int drainTo(Collection<? super Runnable> destination) {
124     return drainTo(destination, Integer.MAX_VALUE);
125   }
126 
127   @Override
128   public int drainTo(Collection<? super Runnable> destination,
129                      int maxElements) {
130     if (destination == this) {
131       throw new IllegalArgumentException(
132           "A BlockingQueue cannot drain to itself.");
133     }
134     List<Call> drained = new ArrayList<Call>();
135     underlyingQueue.drainTo(drained, maxElements);
136     for (Call r : drained) {
137       updateMetrics(r);
138     }
139     destination.addAll(drained);
140     int sz = drained.size();
141     LOG.info("Elements drained: " + sz);
142     return sz;
143   }
144 
145 
146   @Override
147   public boolean offer(Runnable element) {
148     return underlyingQueue.offer(new Call(element));
149   }
150 
151   @Override
152   public boolean offer(Runnable element, long timeout, TimeUnit unit)
153       throws InterruptedException {
154     return underlyingQueue.offer(new Call(element), timeout, unit);
155   }
156   @Override
157   public void put(Runnable element) throws InterruptedException {
158     underlyingQueue.put(new Call(element));
159   }
160 
161   @Override
162   public boolean add(Runnable element) {
163     return underlyingQueue.add(new Call(element));
164   }
165 
166   @Override
167   public boolean addAll(Collection<? extends Runnable> elements) {
168     int added = 0;
169     for (Runnable r : elements) {
170       added += underlyingQueue.add(new Call(r)) ? 1 : 0;
171     }
172     return added != 0;
173   }
174 
175   @Override
176   public Runnable element() {
177     return underlyingQueue.element();
178   }
179 
180   @Override
181   public Runnable peek() {
182     return underlyingQueue.peek();
183   }
184 
185   @Override
186   public void clear() {
187     underlyingQueue.clear();
188   }
189 
190   @Override
191   public boolean containsAll(Collection<?> elements) {
192     return underlyingQueue.containsAll(elements);
193   }
194 
195   @Override
196   public boolean isEmpty() {
197     return underlyingQueue.isEmpty();
198   }
199 
200   @Override
201   public Iterator<Runnable> iterator() {
202     return new Iterator<Runnable>() {
203       final Iterator<Call> underlyingIterator = underlyingQueue.iterator();
204       @Override
205       public Runnable next() {
206         return underlyingIterator.next();
207       }
208 
209       @Override
210       public boolean hasNext() {
211         return underlyingIterator.hasNext();
212       }
213 
214       @Override
215       public void remove() {
216         underlyingIterator.remove();
217       }
218     };
219   }
220 
221   @Override
222   public boolean removeAll(Collection<?> elements) {
223     return underlyingQueue.removeAll(elements);
224   }
225 
226   @Override
227   public boolean retainAll(Collection<?> elements) {
228     return underlyingQueue.retainAll(elements);
229   }
230 
231   @Override
232   public int size() {
233     return underlyingQueue.size();
234   }
235 
236   @Override
237   public Object[] toArray() {
238     return underlyingQueue.toArray();
239   }
240 
241   @Override
242   public <T> T[] toArray(T[] array) {
243     return underlyingQueue.toArray(array);
244   }
245 
246   @Override
247   public boolean contains(Object element) {
248     return underlyingQueue.contains(element);
249   }
250 
251   @Override
252   public int remainingCapacity() {
253     return underlyingQueue.remainingCapacity();
254   }
255 
256   @Override
257   public boolean remove(Object element) {
258     return underlyingQueue.remove(element);
259   }
260 }