001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.util.Collection;
021import java.util.Iterator;
022import java.util.concurrent.BlockingQueue;
023import java.util.concurrent.LinkedBlockingDeque;
024import java.util.concurrent.LinkedBlockingQueue;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.concurrent.atomic.LongAdder;
028
029import org.apache.yetus.audience.InterfaceAudience;
030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
031
032/**
033 * Adaptive LIFO blocking queue utilizing CoDel algorithm to prevent queue overloading.
034 *
035 * Implementing {@link BlockingQueue} interface to be compatible with {@link RpcExecutor}.
036 *
037 * Currently uses milliseconds internally, need to look into whether we should use
038 * nanoseconds for timeInterval and minDelay.
039 *
040 * @see <a href="http://queue.acm.org/detail.cfm?id=2839461">Fail at Scale paper</a>
041 *
042 * @see <a href="https://github.com/facebook/wangle/blob/master/wangle/concurrent/Codel.cpp">
043 *   CoDel version for generic job queues in Wangle library</a>
044 */
045@InterfaceAudience.Private
046public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
047
048  // backing queue
049  private LinkedBlockingDeque<CallRunner> queue;
050
051  // so we can calculate actual threshold to switch to LIFO under load
052  private int maxCapacity;
053
054  // metrics (shared across all queues)
055  private LongAdder numGeneralCallsDropped;
056  private LongAdder numLifoModeSwitches;
057
058  // Both are in milliseconds
059  private volatile int codelTargetDelay;
060  private volatile int codelInterval;
061
062  // if queue if full more than that percent, we switch to LIFO mode.
063  // Values are in the range of 0.7, 0.8 etc (0-1.0).
064  private volatile double lifoThreshold;
065
066  // minimal delay observed during the interval
067  private volatile long minDelay;
068
069  // the moment when current interval ends
070  private volatile long intervalTime = EnvironmentEdgeManager.currentTime();
071
072  // switch to ensure only one threads does interval cutoffs
073  private AtomicBoolean resetDelay = new AtomicBoolean(true);
074
075  // if we're in this mode, "long" calls are getting dropped
076  private AtomicBoolean isOverloaded = new AtomicBoolean(false);
077
078  public AdaptiveLifoCoDelCallQueue(int capacity, int targetDelay, int interval,
079      double lifoThreshold, LongAdder numGeneralCallsDropped, LongAdder numLifoModeSwitches) {
080    this.maxCapacity = capacity;
081    this.queue = new LinkedBlockingDeque<>(capacity);
082    this.codelTargetDelay = targetDelay;
083    this.codelInterval = interval;
084    this.lifoThreshold = lifoThreshold;
085    this.numGeneralCallsDropped = numGeneralCallsDropped;
086    this.numLifoModeSwitches = numLifoModeSwitches;
087  }
088
089  /**
090   * Update tunables.
091   *
092   * @param newCodelTargetDelay new CoDel target delay
093   * @param newCodelInterval new CoDel interval
094   * @param newLifoThreshold new Adaptive Lifo threshold
095   */
096  public void updateTunables(int newCodelTargetDelay, int newCodelInterval,
097                             double newLifoThreshold) {
098    this.codelTargetDelay = newCodelTargetDelay;
099    this.codelInterval = newCodelInterval;
100    this.lifoThreshold = newLifoThreshold;
101  }
102
103  /**
104   * Behaves as {@link LinkedBlockingQueue#take()}, except it will silently
105   * skip all calls which it thinks should be dropped.
106   *
107   * @return the head of this queue
108   * @throws InterruptedException if interrupted while waiting
109   */
110  @Override
111  public CallRunner take() throws InterruptedException {
112    CallRunner cr;
113    while(true) {
114      if (((double) queue.size() / this.maxCapacity) > lifoThreshold) {
115        numLifoModeSwitches.increment();
116        cr = queue.takeLast();
117      } else {
118        cr = queue.takeFirst();
119      }
120      if (needToDrop(cr)) {
121        numGeneralCallsDropped.increment();
122        cr.drop();
123      } else {
124        return cr;
125      }
126    }
127  }
128
129  @Override
130  public CallRunner poll() {
131    CallRunner cr;
132    boolean switched = false;
133    while(true) {
134      if (((double) queue.size() / this.maxCapacity) > lifoThreshold) {
135        // Only count once per switch.
136        if (!switched) {
137          switched = true;
138          numLifoModeSwitches.increment();
139        }
140        cr = queue.pollLast();
141      } else {
142        switched = false;
143        cr = queue.pollFirst();
144      }
145      if (cr == null) {
146        return cr;
147      }
148      if (needToDrop(cr)) {
149        numGeneralCallsDropped.increment();
150        cr.drop();
151      } else {
152        return cr;
153      }
154    }
155  }
156
157  /**
158   * @param callRunner to validate
159   * @return true if this call needs to be skipped based on call timestamp
160   *   and internal queue state (deemed overloaded).
161   */
162  private boolean needToDrop(CallRunner callRunner) {
163    long now = EnvironmentEdgeManager.currentTime();
164    long callDelay = now - callRunner.getRpcCall().getReceiveTime();
165
166    long localMinDelay = this.minDelay;
167
168    // Try and determine if we should reset
169    // the delay time and determine overload
170    if (now > intervalTime &&
171        !resetDelay.get() &&
172        !resetDelay.getAndSet(true)) {
173      intervalTime = now + codelInterval;
174
175      isOverloaded.set(localMinDelay > codelTargetDelay);
176    }
177
178    // If it looks like we should reset the delay
179    // time do it only once on one thread
180    if (resetDelay.get() && resetDelay.getAndSet(false)) {
181      minDelay = callDelay;
182      // we just reset the delay dunno about how this will work
183      return false;
184    } else if (callDelay < localMinDelay) {
185      minDelay = callDelay;
186    }
187
188    return isOverloaded.get() && callDelay > 2 * codelTargetDelay;
189  }
190
191  // Generic BlockingQueue methods we support
192  @Override
193  public boolean offer(CallRunner callRunner) {
194    return queue.offer(callRunner);
195  }
196
197  @Override
198  public int size() {
199    return queue.size();
200  }
201
202  @Override
203  public String toString() {
204    return queue.toString();
205  }
206
207  // This class does NOT provide generic purpose BlockingQueue implementation,
208  // so to prevent misuse all other methods throw UnsupportedOperationException.
209
210  @Override
211  public CallRunner poll(long timeout, TimeUnit unit) throws InterruptedException {
212    throw new UnsupportedOperationException("This class doesn't support anything,"
213      + " but take() and offer() methods");
214  }
215
216
217  @Override
218  public CallRunner peek() {
219    throw new UnsupportedOperationException("This class doesn't support anything,"
220      + " but take() and offer() methods");
221  }
222
223  @Override
224  public boolean remove(Object o) {
225    throw new UnsupportedOperationException("This class doesn't support anything,"
226      + " but take() and offer() methods");
227  }
228
229  @Override
230  public boolean contains(Object o) {
231    throw new UnsupportedOperationException("This class doesn't support anything,"
232      + " but take() and offer() methods");
233  }
234
235  @Override
236  public Object[] toArray() {
237    throw new UnsupportedOperationException("This class doesn't support anything,"
238      + " but take() and offer() methods");
239  }
240
241  @Override
242  public <T> T[] toArray(T[] a) {
243    throw new UnsupportedOperationException("This class doesn't support anything,"
244      + " but take() and offer() methods");
245  }
246
247  @Override
248  public void clear() {
249    throw new UnsupportedOperationException("This class doesn't support anything,"
250      + " but take() and offer() methods");
251  }
252
253  @Override
254  public int drainTo(Collection<? super CallRunner> c) {
255    throw new UnsupportedOperationException("This class doesn't support anything,"
256      + " but take() and offer() methods");
257  }
258
259  @Override
260  public int drainTo(Collection<? super CallRunner> c, int maxElements) {
261    throw new UnsupportedOperationException("This class doesn't support anything,"
262      + " but take() and offer() methods");
263  }
264
265  @Override
266  public Iterator<CallRunner> iterator() {
267    throw new UnsupportedOperationException("This class doesn't support anything,"
268      + " but take() and offer() methods");
269  }
270
271  @Override
272  public boolean add(CallRunner callRunner) {
273    throw new UnsupportedOperationException("This class doesn't support anything,"
274      + " but take() and offer() methods");
275  }
276
277  @Override
278  public CallRunner remove() {
279    throw new UnsupportedOperationException("This class doesn't support anything,"
280      + " but take() and offer() methods");
281  }
282
283  @Override
284  public CallRunner element() {
285    throw new UnsupportedOperationException("This class doesn't support anything,"
286      + " but take() and offer() methods");
287  }
288
289  @Override
290  public boolean addAll(Collection<? extends CallRunner> c) {
291    throw new UnsupportedOperationException("This class doesn't support anything,"
292      + " but take() and offer() methods");
293  }
294
295  @Override
296  public boolean isEmpty() {
297    throw new UnsupportedOperationException("This class doesn't support anything,"
298      + " but take() and offer() methods");
299  }
300
301  @Override
302  public boolean containsAll(Collection<?> c) {
303    throw new UnsupportedOperationException("This class doesn't support anything,"
304      + " but take() and offer() methods");
305  }
306
307  @Override
308  public boolean removeAll(Collection<?> c) {
309    throw new UnsupportedOperationException("This class doesn't support anything,"
310      + " but take() and offer() methods");
311  }
312
313  @Override
314  public boolean retainAll(Collection<?> c) {
315    throw new UnsupportedOperationException("This class doesn't support anything,"
316      + " but take() and offer() methods");
317  }
318
319  @Override
320  public int remainingCapacity() {
321    throw new UnsupportedOperationException("This class doesn't support anything,"
322      + " but take() and offer() methods");
323  }
324
325  @Override
326  public void put(CallRunner callRunner) throws InterruptedException {
327    throw new UnsupportedOperationException("This class doesn't support anything,"
328      + " but take() and offer() methods");
329  }
330
331  @Override
332  public boolean offer(CallRunner callRunner, long timeout, TimeUnit unit)
333      throws InterruptedException {
334    throw new UnsupportedOperationException("This class doesn't support anything,"
335      + " but take() and offer() methods");
336  }
337}