001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.util.Collection; 021import java.util.Iterator; 022import java.util.concurrent.BlockingQueue; 023import java.util.concurrent.LinkedBlockingDeque; 024import java.util.concurrent.LinkedBlockingQueue; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.atomic.AtomicBoolean; 027import java.util.concurrent.atomic.LongAdder; 028import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 029import org.apache.yetus.audience.InterfaceAudience; 030 031/** 032 * Adaptive LIFO blocking queue utilizing CoDel algorithm to prevent queue overloading. Implementing 033 * {@link BlockingQueue} interface to be compatible with {@link RpcExecutor}. Currently uses 034 * milliseconds internally, need to look into whether we should use nanoseconds for timeInterval and 035 * minDelay. 036 * @see <a href="http://queue.acm.org/detail.cfm?id=2839461">Fail at Scale paper</a> 037 * @see <a href="https://github.com/facebook/wangle/blob/master/wangle/concurrent/Codel.cpp"> CoDel 038 * version for generic job queues in Wangle library</a> 039 */ 040@InterfaceAudience.Private 041public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> { 042 043 // backing queue 044 private LinkedBlockingDeque<CallRunner> queue; 045 046 // so we can calculate actual threshold to switch to LIFO under load 047 private volatile int softLimit; 048 049 // metrics (shared across all queues) 050 private LongAdder numGeneralCallsDropped; 051 private LongAdder numLifoModeSwitches; 052 053 // Both are in milliseconds 054 private volatile int codelTargetDelay; 055 private volatile int codelInterval; 056 057 // if queue if full more than that percent, we switch to LIFO mode. 058 // Values are in the range of 0.7, 0.8 etc (0-1.0). 059 private volatile double lifoThreshold; 060 061 // minimal delay observed during the interval 062 private volatile long minDelay; 063 064 // the moment when current interval ends 065 private volatile long intervalTime = EnvironmentEdgeManager.currentTime(); 066 067 // switch to ensure only one threads does interval cutoffs 068 private AtomicBoolean resetDelay = new AtomicBoolean(true); 069 070 // if we're in this mode, "long" calls are getting dropped 071 private AtomicBoolean isOverloaded = new AtomicBoolean(false); 072 073 public AdaptiveLifoCoDelCallQueue(int capacity, int targetDelay, int interval, 074 double lifoThreshold, LongAdder numGeneralCallsDropped, LongAdder numLifoModeSwitches, 075 int currentQueueLimit) { 076 this.queue = new LinkedBlockingDeque<>(capacity); 077 this.codelTargetDelay = targetDelay; 078 this.codelInterval = interval; 079 this.lifoThreshold = lifoThreshold; 080 this.numGeneralCallsDropped = numGeneralCallsDropped; 081 this.numLifoModeSwitches = numLifoModeSwitches; 082 this.softLimit = currentQueueLimit; 083 } 084 085 /** 086 * Update tunables. 087 * @param newCodelTargetDelay new CoDel target delay 088 * @param newCodelInterval new CoDel interval 089 * @param newLifoThreshold new Adaptive Lifo threshold 090 * @param currentQueueLimit new soft limit of queue 091 */ 092 public void updateTunables(int newCodelTargetDelay, int newCodelInterval, double newLifoThreshold, 093 int currentQueueLimit) { 094 this.codelTargetDelay = newCodelTargetDelay; 095 this.codelInterval = newCodelInterval; 096 this.lifoThreshold = newLifoThreshold; 097 this.softLimit = currentQueueLimit; 098 } 099 100 /** 101 * Behaves as {@link LinkedBlockingQueue#take()}, except it will silently skip all calls which it 102 * thinks should be dropped. 103 * @return the head of this queue 104 * @throws InterruptedException if interrupted while waiting 105 */ 106 @Override 107 public CallRunner take() throws InterruptedException { 108 CallRunner cr; 109 while (true) { 110 if (((double) queue.size() / this.softLimit) > lifoThreshold) { 111 numLifoModeSwitches.increment(); 112 cr = queue.takeLast(); 113 } else { 114 cr = queue.takeFirst(); 115 } 116 if (needToDrop(cr)) { 117 numGeneralCallsDropped.increment(); 118 cr.drop(); 119 } else { 120 return cr; 121 } 122 } 123 } 124 125 @Override 126 public CallRunner poll() { 127 CallRunner cr; 128 boolean switched = false; 129 while (true) { 130 if (((double) queue.size() / this.softLimit) > lifoThreshold) { 131 // Only count once per switch. 132 if (!switched) { 133 switched = true; 134 numLifoModeSwitches.increment(); 135 } 136 cr = queue.pollLast(); 137 } else { 138 switched = false; 139 cr = queue.pollFirst(); 140 } 141 if (cr == null) { 142 return cr; 143 } 144 if (needToDrop(cr)) { 145 numGeneralCallsDropped.increment(); 146 cr.drop(); 147 } else { 148 return cr; 149 } 150 } 151 } 152 153 /** 154 * @param callRunner to validate 155 * @return true if this call needs to be skipped based on call timestamp and internal queue state 156 * (deemed overloaded). 157 */ 158 private boolean needToDrop(CallRunner callRunner) { 159 long now = EnvironmentEdgeManager.currentTime(); 160 long callDelay = now - callRunner.getRpcCall().getReceiveTime(); 161 162 long localMinDelay = this.minDelay; 163 164 // Try and determine if we should reset 165 // the delay time and determine overload 166 if (now > intervalTime && !resetDelay.get() && !resetDelay.getAndSet(true)) { 167 intervalTime = now + codelInterval; 168 169 isOverloaded.set(localMinDelay > codelTargetDelay); 170 } 171 172 // If it looks like we should reset the delay 173 // time do it only once on one thread 174 if (resetDelay.get() && resetDelay.getAndSet(false)) { 175 minDelay = callDelay; 176 // we just reset the delay dunno about how this will work 177 return false; 178 } else if (callDelay < localMinDelay) { 179 minDelay = callDelay; 180 } 181 182 return isOverloaded.get() && callDelay > 2 * codelTargetDelay; 183 } 184 185 // Generic BlockingQueue methods we support 186 @Override 187 public boolean offer(CallRunner callRunner) { 188 return queue.offer(callRunner); 189 } 190 191 @Override 192 public int size() { 193 return queue.size(); 194 } 195 196 @Override 197 public String toString() { 198 return queue.toString(); 199 } 200 201 // This class does NOT provide generic purpose BlockingQueue implementation, 202 // so to prevent misuse all other methods throw UnsupportedOperationException. 203 204 @Override 205 public CallRunner poll(long timeout, TimeUnit unit) throws InterruptedException { 206 throw new UnsupportedOperationException( 207 "This class doesn't support anything," + " but take() and offer() methods"); 208 } 209 210 @Override 211 public CallRunner peek() { 212 throw new UnsupportedOperationException( 213 "This class doesn't support anything," + " but take() and offer() methods"); 214 } 215 216 @Override 217 public boolean remove(Object o) { 218 throw new UnsupportedOperationException( 219 "This class doesn't support anything," + " but take() and offer() methods"); 220 } 221 222 @Override 223 public boolean contains(Object o) { 224 throw new UnsupportedOperationException( 225 "This class doesn't support anything," + " but take() and offer() methods"); 226 } 227 228 @Override 229 public Object[] toArray() { 230 throw new UnsupportedOperationException( 231 "This class doesn't support anything," + " but take() and offer() methods"); 232 } 233 234 @Override 235 public <T> T[] toArray(T[] a) { 236 throw new UnsupportedOperationException( 237 "This class doesn't support anything," + " but take() and offer() methods"); 238 } 239 240 @Override 241 public void clear() { 242 throw new UnsupportedOperationException( 243 "This class doesn't support anything," + " but take() and offer() methods"); 244 } 245 246 @Override 247 public int drainTo(Collection<? super CallRunner> c) { 248 throw new UnsupportedOperationException( 249 "This class doesn't support anything," + " but take() and offer() methods"); 250 } 251 252 @Override 253 public int drainTo(Collection<? super CallRunner> c, int maxElements) { 254 throw new UnsupportedOperationException( 255 "This class doesn't support anything," + " but take() and offer() methods"); 256 } 257 258 @Override 259 public Iterator<CallRunner> iterator() { 260 throw new UnsupportedOperationException( 261 "This class doesn't support anything," + " but take() and offer() methods"); 262 } 263 264 @Override 265 public boolean add(CallRunner callRunner) { 266 throw new UnsupportedOperationException( 267 "This class doesn't support anything," + " but take() and offer() methods"); 268 } 269 270 @Override 271 public CallRunner remove() { 272 throw new UnsupportedOperationException( 273 "This class doesn't support anything," + " but take() and offer() methods"); 274 } 275 276 @Override 277 public CallRunner element() { 278 throw new UnsupportedOperationException( 279 "This class doesn't support anything," + " but take() and offer() methods"); 280 } 281 282 @Override 283 public boolean addAll(Collection<? extends CallRunner> c) { 284 throw new UnsupportedOperationException( 285 "This class doesn't support anything," + " but take() and offer() methods"); 286 } 287 288 @Override 289 public boolean isEmpty() { 290 throw new UnsupportedOperationException( 291 "This class doesn't support anything," + " but take() and offer() methods"); 292 } 293 294 @Override 295 public boolean containsAll(Collection<?> c) { 296 throw new UnsupportedOperationException( 297 "This class doesn't support anything," + " but take() and offer() methods"); 298 } 299 300 @Override 301 public boolean removeAll(Collection<?> c) { 302 throw new UnsupportedOperationException( 303 "This class doesn't support anything," + " but take() and offer() methods"); 304 } 305 306 @Override 307 public boolean retainAll(Collection<?> c) { 308 throw new UnsupportedOperationException( 309 "This class doesn't support anything," + " but take() and offer() methods"); 310 } 311 312 @Override 313 public int remainingCapacity() { 314 throw new UnsupportedOperationException( 315 "This class doesn't support anything," + " but take() and offer() methods"); 316 } 317 318 @Override 319 public void put(CallRunner callRunner) throws InterruptedException { 320 throw new UnsupportedOperationException( 321 "This class doesn't support anything," + " but take() and offer() methods"); 322 } 323 324 @Override 325 public boolean offer(CallRunner callRunner, long timeout, TimeUnit unit) 326 throws InterruptedException { 327 throw new UnsupportedOperationException( 328 "This class doesn't support anything," + " but take() and offer() methods"); 329 } 330}