View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.thrift;
20  
21  import java.util.ArrayList;
22  import java.util.Collection;
23  import java.util.Iterator;
24  import java.util.List;
25  import java.util.concurrent.BlockingQueue;
26  import java.util.concurrent.TimeUnit;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  
32  /**
33   * A BlockingQueue reports waiting time in queue and queue length to
34   * ThriftMetrics.
35   */
36  @InterfaceAudience.Private
37  public class CallQueue implements BlockingQueue<Runnable> {
38    private static final Log LOG = LogFactory.getLog(CallQueue.class);
39  
40    private final BlockingQueue<Call> underlyingQueue;
41    private final ThriftMetrics metrics;
42  
43    public CallQueue(BlockingQueue<Call> underlyingQueue,
44                     ThriftMetrics metrics) {
45      this.underlyingQueue = underlyingQueue;
46      this.metrics = metrics;
47    }
48  
49    private static long now() {
50      return System.nanoTime();
51    }
52  
53    public static class Call implements Runnable {
54      final long startTime;
55      final Runnable underlyingRunnable;
56  
57      Call(Runnable underlyingRunnable) {
58        this.underlyingRunnable = underlyingRunnable;
59        this.startTime = now();
60      }
61  
62      @Override
63      public void run() {
64        underlyingRunnable.run();
65      }
66  
67      public long timeInQueue() {
68        return now() - startTime;
69      }
70  
71      @Override
72      public boolean equals(Object other) {
73        if (other instanceof Call) {
74          Call otherCall = (Call)(other);
75          return this.underlyingRunnable.equals(otherCall.underlyingRunnable);
76        } else if (other instanceof Runnable) {
77          return this.underlyingRunnable.equals(other);
78        }
79        return false;
80      }
81  
82      @Override
83      public int hashCode() {
84        return this.underlyingRunnable.hashCode();
85      }
86    }
87  
88    @Override
89    public Runnable poll() {
90      Call result = underlyingQueue.poll();
91      updateMetrics(result);
92      return result;
93    }
94  
95    private void updateMetrics(Call result) {
96      if (result == null) {
97        return;
98      }
99      metrics.incTimeInQueue(result.timeInQueue());
100     metrics.setCallQueueLen(this.size());
101   }
102 
103   @Override
104   public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
105     Call result = underlyingQueue.poll(timeout, unit);
106     updateMetrics(result);
107     return result;
108   }
109 
110   @Override
111   public Runnable remove() {
112     Call result = underlyingQueue.remove();
113     updateMetrics(result);
114     return result;
115   }
116 
117   @Override
118   public Runnable take() throws InterruptedException {
119     Call result = underlyingQueue.take();
120     updateMetrics(result);
121     return result;
122   }
123 
124   @Override
125   public int drainTo(Collection<? super Runnable> destination) {
126     return drainTo(destination, Integer.MAX_VALUE);
127   }
128 
129   @Override
130   public int drainTo(Collection<? super Runnable> destination,
131                      int maxElements) {
132     if (destination == this) {
133       throw new IllegalArgumentException(
134           "A BlockingQueue cannot drain to itself.");
135     }
136     List<Call> drained = new ArrayList<Call>();
137     underlyingQueue.drainTo(drained, maxElements);
138     for (Call r : drained) {
139       updateMetrics(r);
140     }
141     destination.addAll(drained);
142     int sz = drained.size();
143     LOG.info("Elements drained: " + sz);
144     return sz;
145   }
146 
147 
148   @Override
149   public boolean offer(Runnable element) {
150     return underlyingQueue.offer(new Call(element));
151   }
152 
153   @Override
154   public boolean offer(Runnable element, long timeout, TimeUnit unit)
155       throws InterruptedException {
156     return underlyingQueue.offer(new Call(element), timeout, unit);
157   }
158   @Override
159   public void put(Runnable element) throws InterruptedException {
160     underlyingQueue.put(new Call(element));
161   }
162 
163   @Override
164   public boolean add(Runnable element) {
165     return underlyingQueue.add(new Call(element));
166   }
167 
168   @Override
169   public boolean addAll(Collection<? extends Runnable> elements) {
170     int added = 0;
171     for (Runnable r : elements) {
172       added += underlyingQueue.add(new Call(r)) ? 1 : 0;
173     }
174     return added != 0;
175   }
176 
177   @Override
178   public Runnable element() {
179     return underlyingQueue.element();
180   }
181 
182   @Override
183   public Runnable peek() {
184     return underlyingQueue.peek();
185   }
186 
187   @Override
188   public void clear() {
189     underlyingQueue.clear();
190   }
191 
192   @Override
193   public boolean containsAll(Collection<?> elements) {
194     return underlyingQueue.containsAll(elements);
195   }
196 
197   @Override
198   public boolean isEmpty() {
199     return underlyingQueue.isEmpty();
200   }
201 
202   @Override
203   public Iterator<Runnable> iterator() {
204     return new Iterator<Runnable>() {
205       final Iterator<Call> underlyingIterator = underlyingQueue.iterator();
206       @Override
207       public Runnable next() {
208         return underlyingIterator.next();
209       }
210 
211       @Override
212       public boolean hasNext() {
213         return underlyingIterator.hasNext();
214       }
215 
216       @Override
217       public void remove() {
218         underlyingIterator.remove();
219       }
220     };
221   }
222 
223   @Override
224   public boolean removeAll(Collection<?> elements) {
225     return underlyingQueue.removeAll(elements);
226   }
227 
228   @Override
229   public boolean retainAll(Collection<?> elements) {
230     return underlyingQueue.retainAll(elements);
231   }
232 
233   @Override
234   public int size() {
235     return underlyingQueue.size();
236   }
237 
238   @Override
239   public Object[] toArray() {
240     return underlyingQueue.toArray();
241   }
242 
243   @Override
244   public <T> T[] toArray(T[] array) {
245     return underlyingQueue.toArray(array);
246   }
247 
248   @Override
249   public boolean contains(Object element) {
250     return underlyingQueue.contains(element);
251   }
252 
253   @Override
254   public int remainingCapacity() {
255     return underlyingQueue.remainingCapacity();
256   }
257 
258   @Override
259   public boolean remove(Object element) {
260     return underlyingQueue.remove(element);
261   }
262 }