001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.util.Collection;
021import java.util.Iterator;
022import java.util.concurrent.BlockingQueue;
023import java.util.concurrent.LinkedBlockingDeque;
024import java.util.concurrent.LinkedBlockingQueue;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.concurrent.atomic.LongAdder;
028import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
029import org.apache.yetus.audience.InterfaceAudience;
030
031/**
032 * Adaptive LIFO blocking queue utilizing CoDel algorithm to prevent queue overloading. Implementing
033 * {@link BlockingQueue} interface to be compatible with {@link RpcExecutor}. Currently uses
034 * milliseconds internally, need to look into whether we should use nanoseconds for timeInterval and
035 * minDelay.
036 * @see <a href="http://queue.acm.org/detail.cfm?id=2839461">Fail at Scale paper</a>
037 * @see <a href="https://github.com/facebook/wangle/blob/master/wangle/concurrent/Codel.cpp"> CoDel
038 *      version for generic job queues in Wangle library</a>
039 */
040@InterfaceAudience.Private
041public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
042
043  // backing queue
044  private LinkedBlockingDeque<CallRunner> queue;
045
046  // so we can calculate actual threshold to switch to LIFO under load
047  private int maxCapacity;
048
049  // metrics (shared across all queues)
050  private LongAdder numGeneralCallsDropped;
051  private LongAdder numLifoModeSwitches;
052
053  // Both are in milliseconds
054  private volatile int codelTargetDelay;
055  private volatile int codelInterval;
056
057  // if queue if full more than that percent, we switch to LIFO mode.
058  // Values are in the range of 0.7, 0.8 etc (0-1.0).
059  private volatile double lifoThreshold;
060
061  // minimal delay observed during the interval
062  private volatile long minDelay;
063
064  // the moment when current interval ends
065  private volatile long intervalTime = EnvironmentEdgeManager.currentTime();
066
067  // switch to ensure only one threads does interval cutoffs
068  private AtomicBoolean resetDelay = new AtomicBoolean(true);
069
070  // if we're in this mode, "long" calls are getting dropped
071  private AtomicBoolean isOverloaded = new AtomicBoolean(false);
072
073  public AdaptiveLifoCoDelCallQueue(int capacity, int targetDelay, int interval,
074    double lifoThreshold, LongAdder numGeneralCallsDropped, LongAdder numLifoModeSwitches) {
075    this.maxCapacity = capacity;
076    this.queue = new LinkedBlockingDeque<>(capacity);
077    this.codelTargetDelay = targetDelay;
078    this.codelInterval = interval;
079    this.lifoThreshold = lifoThreshold;
080    this.numGeneralCallsDropped = numGeneralCallsDropped;
081    this.numLifoModeSwitches = numLifoModeSwitches;
082  }
083
084  /**
085   * Update tunables.
086   * @param newCodelTargetDelay new CoDel target delay
087   * @param newCodelInterval    new CoDel interval
088   * @param newLifoThreshold    new Adaptive Lifo threshold
089   */
090  public void updateTunables(int newCodelTargetDelay, int newCodelInterval,
091    double newLifoThreshold) {
092    this.codelTargetDelay = newCodelTargetDelay;
093    this.codelInterval = newCodelInterval;
094    this.lifoThreshold = newLifoThreshold;
095  }
096
097  /**
098   * Behaves as {@link LinkedBlockingQueue#take()}, except it will silently skip all calls which it
099   * thinks should be dropped.
100   * @return the head of this queue
101   * @throws InterruptedException if interrupted while waiting
102   */
103  @Override
104  public CallRunner take() throws InterruptedException {
105    CallRunner cr;
106    while (true) {
107      if (((double) queue.size() / this.maxCapacity) > lifoThreshold) {
108        numLifoModeSwitches.increment();
109        cr = queue.takeLast();
110      } else {
111        cr = queue.takeFirst();
112      }
113      if (needToDrop(cr)) {
114        numGeneralCallsDropped.increment();
115        cr.drop();
116      } else {
117        return cr;
118      }
119    }
120  }
121
122  @Override
123  public CallRunner poll() {
124    CallRunner cr;
125    boolean switched = false;
126    while (true) {
127      if (((double) queue.size() / this.maxCapacity) > lifoThreshold) {
128        // Only count once per switch.
129        if (!switched) {
130          switched = true;
131          numLifoModeSwitches.increment();
132        }
133        cr = queue.pollLast();
134      } else {
135        switched = false;
136        cr = queue.pollFirst();
137      }
138      if (cr == null) {
139        return cr;
140      }
141      if (needToDrop(cr)) {
142        numGeneralCallsDropped.increment();
143        cr.drop();
144      } else {
145        return cr;
146      }
147    }
148  }
149
150  /**
151   * @param callRunner to validate
152   * @return true if this call needs to be skipped based on call timestamp and internal queue state
153   *         (deemed overloaded).
154   */
155  private boolean needToDrop(CallRunner callRunner) {
156    long now = EnvironmentEdgeManager.currentTime();
157    long callDelay = now - callRunner.getRpcCall().getReceiveTime();
158
159    long localMinDelay = this.minDelay;
160
161    // Try and determine if we should reset
162    // the delay time and determine overload
163    if (now > intervalTime && !resetDelay.get() && !resetDelay.getAndSet(true)) {
164      intervalTime = now + codelInterval;
165
166      isOverloaded.set(localMinDelay > codelTargetDelay);
167    }
168
169    // If it looks like we should reset the delay
170    // time do it only once on one thread
171    if (resetDelay.get() && resetDelay.getAndSet(false)) {
172      minDelay = callDelay;
173      // we just reset the delay dunno about how this will work
174      return false;
175    } else if (callDelay < localMinDelay) {
176      minDelay = callDelay;
177    }
178
179    return isOverloaded.get() && callDelay > 2 * codelTargetDelay;
180  }
181
182  // Generic BlockingQueue methods we support
183  @Override
184  public boolean offer(CallRunner callRunner) {
185    return queue.offer(callRunner);
186  }
187
188  @Override
189  public int size() {
190    return queue.size();
191  }
192
193  @Override
194  public String toString() {
195    return queue.toString();
196  }
197
198  // This class does NOT provide generic purpose BlockingQueue implementation,
199  // so to prevent misuse all other methods throw UnsupportedOperationException.
200
201  @Override
202  public CallRunner poll(long timeout, TimeUnit unit) throws InterruptedException {
203    throw new UnsupportedOperationException(
204      "This class doesn't support anything," + " but take() and offer() methods");
205  }
206
207  @Override
208  public CallRunner peek() {
209    throw new UnsupportedOperationException(
210      "This class doesn't support anything," + " but take() and offer() methods");
211  }
212
213  @Override
214  public boolean remove(Object o) {
215    throw new UnsupportedOperationException(
216      "This class doesn't support anything," + " but take() and offer() methods");
217  }
218
219  @Override
220  public boolean contains(Object o) {
221    throw new UnsupportedOperationException(
222      "This class doesn't support anything," + " but take() and offer() methods");
223  }
224
225  @Override
226  public Object[] toArray() {
227    throw new UnsupportedOperationException(
228      "This class doesn't support anything," + " but take() and offer() methods");
229  }
230
231  @Override
232  public <T> T[] toArray(T[] a) {
233    throw new UnsupportedOperationException(
234      "This class doesn't support anything," + " but take() and offer() methods");
235  }
236
237  @Override
238  public void clear() {
239    throw new UnsupportedOperationException(
240      "This class doesn't support anything," + " but take() and offer() methods");
241  }
242
243  @Override
244  public int drainTo(Collection<? super CallRunner> c) {
245    throw new UnsupportedOperationException(
246      "This class doesn't support anything," + " but take() and offer() methods");
247  }
248
249  @Override
250  public int drainTo(Collection<? super CallRunner> c, int maxElements) {
251    throw new UnsupportedOperationException(
252      "This class doesn't support anything," + " but take() and offer() methods");
253  }
254
255  @Override
256  public Iterator<CallRunner> iterator() {
257    throw new UnsupportedOperationException(
258      "This class doesn't support anything," + " but take() and offer() methods");
259  }
260
261  @Override
262  public boolean add(CallRunner callRunner) {
263    throw new UnsupportedOperationException(
264      "This class doesn't support anything," + " but take() and offer() methods");
265  }
266
267  @Override
268  public CallRunner remove() {
269    throw new UnsupportedOperationException(
270      "This class doesn't support anything," + " but take() and offer() methods");
271  }
272
273  @Override
274  public CallRunner element() {
275    throw new UnsupportedOperationException(
276      "This class doesn't support anything," + " but take() and offer() methods");
277  }
278
279  @Override
280  public boolean addAll(Collection<? extends CallRunner> c) {
281    throw new UnsupportedOperationException(
282      "This class doesn't support anything," + " but take() and offer() methods");
283  }
284
285  @Override
286  public boolean isEmpty() {
287    throw new UnsupportedOperationException(
288      "This class doesn't support anything," + " but take() and offer() methods");
289  }
290
291  @Override
292  public boolean containsAll(Collection<?> c) {
293    throw new UnsupportedOperationException(
294      "This class doesn't support anything," + " but take() and offer() methods");
295  }
296
297  @Override
298  public boolean removeAll(Collection<?> c) {
299    throw new UnsupportedOperationException(
300      "This class doesn't support anything," + " but take() and offer() methods");
301  }
302
303  @Override
304  public boolean retainAll(Collection<?> c) {
305    throw new UnsupportedOperationException(
306      "This class doesn't support anything," + " but take() and offer() methods");
307  }
308
309  @Override
310  public int remainingCapacity() {
311    throw new UnsupportedOperationException(
312      "This class doesn't support anything," + " but take() and offer() methods");
313  }
314
315  @Override
316  public void put(CallRunner callRunner) throws InterruptedException {
317    throw new UnsupportedOperationException(
318      "This class doesn't support anything," + " but take() and offer() methods");
319  }
320
321  @Override
322  public boolean offer(CallRunner callRunner, long timeout, TimeUnit unit)
323    throws InterruptedException {
324    throw new UnsupportedOperationException(
325      "This class doesn't support anything," + " but take() and offer() methods");
326  }
327}