001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.thrift;
020
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.Iterator;
024import java.util.List;
025import java.util.concurrent.BlockingQueue;
026import java.util.concurrent.TimeUnit;
027
028import org.apache.yetus.audience.InterfaceAudience;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032/**
033 * A BlockingQueue reports waiting time in queue and queue length to
034 * ThriftMetrics.
035 */
036@InterfaceAudience.Private
037public class CallQueue implements BlockingQueue<Runnable> {
038  private static final Logger LOG = LoggerFactory.getLogger(CallQueue.class);
039
040  private final BlockingQueue<Call> underlyingQueue;
041  private final ThriftMetrics metrics;
042
043  public CallQueue(BlockingQueue<Call> underlyingQueue,
044                   ThriftMetrics metrics) {
045    this.underlyingQueue = underlyingQueue;
046    this.metrics = metrics;
047  }
048
049  private static long now() {
050    return System.nanoTime();
051  }
052
053  public static class Call implements Runnable {
054    final long startTime;
055    final Runnable underlyingRunnable;
056
057    Call(Runnable underlyingRunnable) {
058      this.underlyingRunnable = underlyingRunnable;
059      this.startTime = now();
060    }
061
062    @Override
063    public void run() {
064      underlyingRunnable.run();
065    }
066
067    public long timeInQueue() {
068      return now() - startTime;
069    }
070
071    @Override
072    public boolean equals(Object other) {
073      if (other instanceof Call) {
074        Call otherCall = (Call)(other);
075        return this.underlyingRunnable.equals(otherCall.underlyingRunnable);
076      } else if (other instanceof Runnable) {
077        return this.underlyingRunnable.equals(other);
078      }
079      return false;
080    }
081
082    @Override
083    public int hashCode() {
084      return this.underlyingRunnable.hashCode();
085    }
086  }
087
088  @Override
089  public Runnable poll() {
090    Call result = underlyingQueue.poll();
091    updateMetrics(result);
092    return result;
093  }
094
095  private void updateMetrics(Call result) {
096    if (result == null) {
097      return;
098    }
099    metrics.incTimeInQueue(result.timeInQueue());
100    metrics.setCallQueueLen(this.size());
101  }
102
103  @Override
104  public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
105    Call result = underlyingQueue.poll(timeout, unit);
106    updateMetrics(result);
107    return result;
108  }
109
110  @Override
111  public Runnable remove() {
112    Call result = underlyingQueue.remove();
113    updateMetrics(result);
114    return result;
115  }
116
117  @Override
118  public Runnable take() throws InterruptedException {
119    Call result = underlyingQueue.take();
120    updateMetrics(result);
121    return result;
122  }
123
124  @Override
125  public int drainTo(Collection<? super Runnable> destination) {
126    return drainTo(destination, Integer.MAX_VALUE);
127  }
128
129  @Override
130  public int drainTo(Collection<? super Runnable> destination,
131                     int maxElements) {
132    if (destination == this) {
133      throw new IllegalArgumentException(
134          "A BlockingQueue cannot drain to itself.");
135    }
136    List<Call> drained = new ArrayList<>();
137    underlyingQueue.drainTo(drained, maxElements);
138    for (Call r : drained) {
139      updateMetrics(r);
140    }
141    destination.addAll(drained);
142    int sz = drained.size();
143    LOG.info("Elements drained: " + sz);
144    return sz;
145  }
146
147
148  @Override
149  public boolean offer(Runnable element) {
150    return underlyingQueue.offer(new Call(element));
151  }
152
153  @Override
154  public boolean offer(Runnable element, long timeout, TimeUnit unit)
155      throws InterruptedException {
156    return underlyingQueue.offer(new Call(element), timeout, unit);
157  }
158  @Override
159  public void put(Runnable element) throws InterruptedException {
160    underlyingQueue.put(new Call(element));
161  }
162
163  @Override
164  public boolean add(Runnable element) {
165    return underlyingQueue.add(new Call(element));
166  }
167
168  @Override
169  public boolean addAll(Collection<? extends Runnable> elements) {
170    int added = 0;
171    for (Runnable r : elements) {
172      added += underlyingQueue.add(new Call(r)) ? 1 : 0;
173    }
174    return added != 0;
175  }
176
177  @Override
178  public Runnable element() {
179    return underlyingQueue.element();
180  }
181
182  @Override
183  public Runnable peek() {
184    return underlyingQueue.peek();
185  }
186
187  @Override
188  public void clear() {
189    underlyingQueue.clear();
190  }
191
192  @Override
193  public boolean containsAll(Collection<?> elements) {
194    return underlyingQueue.containsAll(elements);
195  }
196
197  @Override
198  public boolean isEmpty() {
199    return underlyingQueue.isEmpty();
200  }
201
202  @Override
203  public Iterator<Runnable> iterator() {
204    return new Iterator<Runnable>() {
205      final Iterator<Call> underlyingIterator = underlyingQueue.iterator();
206      @Override
207      public Runnable next() {
208        return underlyingIterator.next();
209      }
210
211      @Override
212      public boolean hasNext() {
213        return underlyingIterator.hasNext();
214      }
215
216      @Override
217      public void remove() {
218        underlyingIterator.remove();
219      }
220    };
221  }
222
223  @Override
224  public boolean removeAll(Collection<?> elements) {
225    return underlyingQueue.removeAll(elements);
226  }
227
228  @Override
229  public boolean retainAll(Collection<?> elements) {
230    return underlyingQueue.retainAll(elements);
231  }
232
233  @Override
234  public int size() {
235    return underlyingQueue.size();
236  }
237
238  @Override
239  public Object[] toArray() {
240    return underlyingQueue.toArray();
241  }
242
243  @Override
244  public <T> T[] toArray(T[] array) {
245    return underlyingQueue.toArray(array);
246  }
247
248  @Override
249  public boolean contains(Object element) {
250    return underlyingQueue.contains(element);
251  }
252
253  @Override
254  public int remainingCapacity() {
255    return underlyingQueue.remainingCapacity();
256  }
257
258  @Override
259  public boolean remove(Object element) {
260    return underlyingQueue.remove(element);
261  }
262}