001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.util.Collection;
021import java.util.Iterator;
022import java.util.concurrent.BlockingQueue;
023import java.util.concurrent.LinkedBlockingDeque;
024import java.util.concurrent.LinkedBlockingQueue;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.concurrent.atomic.LongAdder;
028import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
029import org.apache.yetus.audience.InterfaceAudience;
030
031/**
032 * Adaptive LIFO blocking queue utilizing CoDel algorithm to prevent queue overloading. Implementing
033 * {@link BlockingQueue} interface to be compatible with {@link RpcExecutor}. Currently uses
034 * milliseconds internally, need to look into whether we should use nanoseconds for timeInterval and
035 * minDelay.
036 * @see <a href="http://queue.acm.org/detail.cfm?id=2839461">Fail at Scale paper</a>
037 * @see <a href="https://github.com/facebook/wangle/blob/master/wangle/concurrent/Codel.cpp"> CoDel
038 *      version for generic job queues in Wangle library</a>
039 */
040@InterfaceAudience.Private
041public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
042
043  // backing queue
044  private LinkedBlockingDeque<CallRunner> queue;
045
046  // so we can calculate actual threshold to switch to LIFO under load
047  private volatile int softLimit;
048
049  // metrics (shared across all queues)
050  private LongAdder numGeneralCallsDropped;
051  private LongAdder numLifoModeSwitches;
052
053  // Both are in milliseconds
054  private volatile int codelTargetDelay;
055  private volatile int codelInterval;
056
057  // if queue if full more than that percent, we switch to LIFO mode.
058  // Values are in the range of 0.7, 0.8 etc (0-1.0).
059  private volatile double lifoThreshold;
060
061  // minimal delay observed during the interval
062  private volatile long minDelay;
063
064  // the moment when current interval ends
065  private volatile long intervalTime = EnvironmentEdgeManager.currentTime();
066
067  // switch to ensure only one threads does interval cutoffs
068  private AtomicBoolean resetDelay = new AtomicBoolean(true);
069
070  // if we're in this mode, "long" calls are getting dropped
071  private AtomicBoolean isOverloaded = new AtomicBoolean(false);
072
073  public AdaptiveLifoCoDelCallQueue(int capacity, int targetDelay, int interval,
074    double lifoThreshold, LongAdder numGeneralCallsDropped, LongAdder numLifoModeSwitches,
075    int currentQueueLimit) {
076    this.queue = new LinkedBlockingDeque<>(capacity);
077    this.codelTargetDelay = targetDelay;
078    this.codelInterval = interval;
079    this.lifoThreshold = lifoThreshold;
080    this.numGeneralCallsDropped = numGeneralCallsDropped;
081    this.numLifoModeSwitches = numLifoModeSwitches;
082    this.softLimit = currentQueueLimit;
083  }
084
085  /**
086   * Update tunables.
087   * @param newCodelTargetDelay new CoDel target delay
088   * @param newCodelInterval    new CoDel interval
089   * @param newLifoThreshold    new Adaptive Lifo threshold
090   * @param currentQueueLimit   new soft limit of queue
091   */
092  public void updateTunables(int newCodelTargetDelay, int newCodelInterval, double newLifoThreshold,
093    int currentQueueLimit) {
094    this.codelTargetDelay = newCodelTargetDelay;
095    this.codelInterval = newCodelInterval;
096    this.lifoThreshold = newLifoThreshold;
097    this.softLimit = currentQueueLimit;
098  }
099
100  /**
101   * Behaves as {@link LinkedBlockingQueue#take()}, except it will silently skip all calls which it
102   * thinks should be dropped.
103   * @return the head of this queue
104   * @throws InterruptedException if interrupted while waiting
105   */
106  @Override
107  public CallRunner take() throws InterruptedException {
108    CallRunner cr;
109    while (true) {
110      if (((double) queue.size() / this.softLimit) > lifoThreshold) {
111        numLifoModeSwitches.increment();
112        cr = queue.takeLast();
113      } else {
114        cr = queue.takeFirst();
115      }
116      if (needToDrop(cr)) {
117        numGeneralCallsDropped.increment();
118        cr.drop();
119      } else {
120        return cr;
121      }
122    }
123  }
124
125  @Override
126  public CallRunner poll() {
127    CallRunner cr;
128    boolean switched = false;
129    while (true) {
130      if (((double) queue.size() / this.softLimit) > lifoThreshold) {
131        // Only count once per switch.
132        if (!switched) {
133          switched = true;
134          numLifoModeSwitches.increment();
135        }
136        cr = queue.pollLast();
137      } else {
138        switched = false;
139        cr = queue.pollFirst();
140      }
141      if (cr == null) {
142        return cr;
143      }
144      if (needToDrop(cr)) {
145        numGeneralCallsDropped.increment();
146        cr.drop();
147      } else {
148        return cr;
149      }
150    }
151  }
152
153  /**
154   * @param callRunner to validate
155   * @return true if this call needs to be skipped based on call timestamp and internal queue state
156   *         (deemed overloaded).
157   */
158  private boolean needToDrop(CallRunner callRunner) {
159    long now = EnvironmentEdgeManager.currentTime();
160    long callDelay = now - callRunner.getRpcCall().getReceiveTime();
161
162    long localMinDelay = this.minDelay;
163
164    // Try and determine if we should reset
165    // the delay time and determine overload
166    if (now > intervalTime && !resetDelay.get() && !resetDelay.getAndSet(true)) {
167      intervalTime = now + codelInterval;
168
169      isOverloaded.set(localMinDelay > codelTargetDelay);
170    }
171
172    // If it looks like we should reset the delay
173    // time do it only once on one thread
174    if (resetDelay.get() && resetDelay.getAndSet(false)) {
175      minDelay = callDelay;
176      // we just reset the delay dunno about how this will work
177      return false;
178    } else if (callDelay < localMinDelay) {
179      minDelay = callDelay;
180    }
181
182    return isOverloaded.get() && callDelay > 2 * codelTargetDelay;
183  }
184
185  // Generic BlockingQueue methods we support
186  @Override
187  public boolean offer(CallRunner callRunner) {
188    return queue.offer(callRunner);
189  }
190
191  @Override
192  public int size() {
193    return queue.size();
194  }
195
196  @Override
197  public String toString() {
198    return queue.toString();
199  }
200
201  // This class does NOT provide generic purpose BlockingQueue implementation,
202  // so to prevent misuse all other methods throw UnsupportedOperationException.
203
204  @Override
205  public CallRunner poll(long timeout, TimeUnit unit) throws InterruptedException {
206    throw new UnsupportedOperationException(
207      "This class doesn't support anything," + " but take() and offer() methods");
208  }
209
210  @Override
211  public CallRunner peek() {
212    throw new UnsupportedOperationException(
213      "This class doesn't support anything," + " but take() and offer() methods");
214  }
215
216  @Override
217  public boolean remove(Object o) {
218    throw new UnsupportedOperationException(
219      "This class doesn't support anything," + " but take() and offer() methods");
220  }
221
222  @Override
223  public boolean contains(Object o) {
224    throw new UnsupportedOperationException(
225      "This class doesn't support anything," + " but take() and offer() methods");
226  }
227
228  @Override
229  public Object[] toArray() {
230    throw new UnsupportedOperationException(
231      "This class doesn't support anything," + " but take() and offer() methods");
232  }
233
234  @Override
235  public <T> T[] toArray(T[] a) {
236    throw new UnsupportedOperationException(
237      "This class doesn't support anything," + " but take() and offer() methods");
238  }
239
240  @Override
241  public void clear() {
242    throw new UnsupportedOperationException(
243      "This class doesn't support anything," + " but take() and offer() methods");
244  }
245
246  @Override
247  public int drainTo(Collection<? super CallRunner> c) {
248    throw new UnsupportedOperationException(
249      "This class doesn't support anything," + " but take() and offer() methods");
250  }
251
252  @Override
253  public int drainTo(Collection<? super CallRunner> c, int maxElements) {
254    throw new UnsupportedOperationException(
255      "This class doesn't support anything," + " but take() and offer() methods");
256  }
257
258  @Override
259  public Iterator<CallRunner> iterator() {
260    throw new UnsupportedOperationException(
261      "This class doesn't support anything," + " but take() and offer() methods");
262  }
263
264  @Override
265  public boolean add(CallRunner callRunner) {
266    throw new UnsupportedOperationException(
267      "This class doesn't support anything," + " but take() and offer() methods");
268  }
269
270  @Override
271  public CallRunner remove() {
272    throw new UnsupportedOperationException(
273      "This class doesn't support anything," + " but take() and offer() methods");
274  }
275
276  @Override
277  public CallRunner element() {
278    throw new UnsupportedOperationException(
279      "This class doesn't support anything," + " but take() and offer() methods");
280  }
281
282  @Override
283  public boolean addAll(Collection<? extends CallRunner> c) {
284    throw new UnsupportedOperationException(
285      "This class doesn't support anything," + " but take() and offer() methods");
286  }
287
288  @Override
289  public boolean isEmpty() {
290    throw new UnsupportedOperationException(
291      "This class doesn't support anything," + " but take() and offer() methods");
292  }
293
294  @Override
295  public boolean containsAll(Collection<?> c) {
296    throw new UnsupportedOperationException(
297      "This class doesn't support anything," + " but take() and offer() methods");
298  }
299
300  @Override
301  public boolean removeAll(Collection<?> c) {
302    throw new UnsupportedOperationException(
303      "This class doesn't support anything," + " but take() and offer() methods");
304  }
305
306  @Override
307  public boolean retainAll(Collection<?> c) {
308    throw new UnsupportedOperationException(
309      "This class doesn't support anything," + " but take() and offer() methods");
310  }
311
312  @Override
313  public int remainingCapacity() {
314    throw new UnsupportedOperationException(
315      "This class doesn't support anything," + " but take() and offer() methods");
316  }
317
318  @Override
319  public void put(CallRunner callRunner) throws InterruptedException {
320    throw new UnsupportedOperationException(
321      "This class doesn't support anything," + " but take() and offer() methods");
322  }
323
324  @Override
325  public boolean offer(CallRunner callRunner, long timeout, TimeUnit unit)
326    throws InterruptedException {
327    throw new UnsupportedOperationException(
328      "This class doesn't support anything," + " but take() and offer() methods");
329  }
330}