001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.thrift;
019
020import java.util.ArrayList;
021import java.util.Collection;
022import java.util.Iterator;
023import java.util.List;
024import java.util.concurrent.BlockingQueue;
025import java.util.concurrent.TimeUnit;
026import org.apache.yetus.audience.InterfaceAudience;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029
030/**
031 * A BlockingQueue reports waiting time in queue and queue length to ThriftMetrics.
032 */
033@InterfaceAudience.Private
034public class CallQueue implements BlockingQueue<Runnable> {
035  private static final Logger LOG = LoggerFactory.getLogger(CallQueue.class);
036
037  private final BlockingQueue<Call> underlyingQueue;
038  private final ThriftMetrics metrics;
039
040  public CallQueue(BlockingQueue<Call> underlyingQueue, ThriftMetrics metrics) {
041    this.underlyingQueue = underlyingQueue;
042    this.metrics = metrics;
043  }
044
045  private static long now() {
046    return System.nanoTime();
047  }
048
049  public static class Call implements Runnable {
050    final long startTime;
051    final Runnable underlyingRunnable;
052
053    Call(Runnable underlyingRunnable) {
054      this.underlyingRunnable = underlyingRunnable;
055      this.startTime = now();
056    }
057
058    @Override
059    public void run() {
060      underlyingRunnable.run();
061    }
062
063    public long timeInQueue() {
064      return now() - startTime;
065    }
066
067    @Override
068    public boolean equals(Object other) {
069      if (other instanceof Call) {
070        Call otherCall = (Call) (other);
071        return this.underlyingRunnable.equals(otherCall.underlyingRunnable);
072      } else if (other instanceof Runnable) {
073        return this.underlyingRunnable.equals(other);
074      }
075      return false;
076    }
077
078    @Override
079    public int hashCode() {
080      return this.underlyingRunnable.hashCode();
081    }
082  }
083
084  @Override
085  public Runnable poll() {
086    Call result = underlyingQueue.poll();
087    updateMetrics(result);
088    return result;
089  }
090
091  private void updateMetrics(Call result) {
092    if (result == null) {
093      return;
094    }
095    metrics.incTimeInQueue(result.timeInQueue());
096    metrics.setCallQueueLen(this.size());
097  }
098
099  @Override
100  public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
101    Call result = underlyingQueue.poll(timeout, unit);
102    updateMetrics(result);
103    return result;
104  }
105
106  @Override
107  public Runnable remove() {
108    Call result = underlyingQueue.remove();
109    updateMetrics(result);
110    return result;
111  }
112
113  @Override
114  public Runnable take() throws InterruptedException {
115    Call result = underlyingQueue.take();
116    updateMetrics(result);
117    return result;
118  }
119
120  @Override
121  public int drainTo(Collection<? super Runnable> destination) {
122    return drainTo(destination, Integer.MAX_VALUE);
123  }
124
125  @Override
126  public int drainTo(Collection<? super Runnable> destination, int maxElements) {
127    if (destination == this) {
128      throw new IllegalArgumentException("A BlockingQueue cannot drain to itself.");
129    }
130    List<Call> drained = new ArrayList<>();
131    underlyingQueue.drainTo(drained, maxElements);
132    for (Call r : drained) {
133      updateMetrics(r);
134    }
135    destination.addAll(drained);
136    int sz = drained.size();
137    LOG.info("Elements drained: " + sz);
138    return sz;
139  }
140
141  @Override
142  public boolean offer(Runnable element) {
143    return underlyingQueue.offer(new Call(element));
144  }
145
146  @Override
147  public boolean offer(Runnable element, long timeout, TimeUnit unit) throws InterruptedException {
148    return underlyingQueue.offer(new Call(element), timeout, unit);
149  }
150
151  @Override
152  public void put(Runnable element) throws InterruptedException {
153    underlyingQueue.put(new Call(element));
154  }
155
156  @Override
157  public boolean add(Runnable element) {
158    return underlyingQueue.add(new Call(element));
159  }
160
161  @Override
162  public boolean addAll(Collection<? extends Runnable> elements) {
163    int added = 0;
164    for (Runnable r : elements) {
165      added += underlyingQueue.add(new Call(r)) ? 1 : 0;
166    }
167    return added != 0;
168  }
169
170  @Override
171  public Runnable element() {
172    return underlyingQueue.element();
173  }
174
175  @Override
176  public Runnable peek() {
177    return underlyingQueue.peek();
178  }
179
180  @Override
181  public void clear() {
182    underlyingQueue.clear();
183  }
184
185  @Override
186  public boolean containsAll(Collection<?> elements) {
187    return underlyingQueue.containsAll(elements);
188  }
189
190  @Override
191  public boolean isEmpty() {
192    return underlyingQueue.isEmpty();
193  }
194
195  @Override
196  public Iterator<Runnable> iterator() {
197    return new Iterator<Runnable>() {
198      final Iterator<Call> underlyingIterator = underlyingQueue.iterator();
199
200      @Override
201      public Runnable next() {
202        return underlyingIterator.next();
203      }
204
205      @Override
206      public boolean hasNext() {
207        return underlyingIterator.hasNext();
208      }
209
210      @Override
211      public void remove() {
212        underlyingIterator.remove();
213      }
214    };
215  }
216
217  @Override
218  public boolean removeAll(Collection<?> elements) {
219    return underlyingQueue.removeAll(elements);
220  }
221
222  @Override
223  public boolean retainAll(Collection<?> elements) {
224    return underlyingQueue.retainAll(elements);
225  }
226
227  @Override
228  public int size() {
229    return underlyingQueue.size();
230  }
231
232  @Override
233  public Object[] toArray() {
234    return underlyingQueue.toArray();
235  }
236
237  @Override
238  public <T> T[] toArray(T[] array) {
239    return underlyingQueue.toArray(array);
240  }
241
242  @Override
243  public boolean contains(Object element) {
244    return underlyingQueue.contains(element);
245  }
246
247  @Override
248  public int remainingCapacity() {
249    return underlyingQueue.remainingCapacity();
250  }
251
252  @Override
253  public boolean remove(Object element) {
254    return underlyingQueue.remove(element);
255  }
256}