001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.util.Collection; 021import java.util.Iterator; 022import java.util.concurrent.BlockingQueue; 023import java.util.concurrent.LinkedBlockingDeque; 024import java.util.concurrent.LinkedBlockingQueue; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.atomic.AtomicBoolean; 027import java.util.concurrent.atomic.LongAdder; 028 029import org.apache.yetus.audience.InterfaceAudience; 030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 031 032/** 033 * Adaptive LIFO blocking queue utilizing CoDel algorithm to prevent queue overloading. 034 * 035 * Implementing {@link BlockingQueue} interface to be compatible with {@link RpcExecutor}. 036 * 037 * Currently uses milliseconds internally, need to look into whether we should use 038 * nanoseconds for timeInterval and minDelay. 039 * 040 * @see <a href="http://queue.acm.org/detail.cfm?id=2839461">Fail at Scale paper</a> 041 * 042 * @see <a href="https://github.com/facebook/wangle/blob/master/wangle/concurrent/Codel.cpp"> 043 * CoDel version for generic job queues in Wangle library</a> 044 */ 045@InterfaceAudience.Private 046public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> { 047 048 // backing queue 049 private LinkedBlockingDeque<CallRunner> queue; 050 051 // so we can calculate actual threshold to switch to LIFO under load 052 private int maxCapacity; 053 054 // metrics (shared across all queues) 055 private LongAdder numGeneralCallsDropped; 056 private LongAdder numLifoModeSwitches; 057 058 // Both are in milliseconds 059 private volatile int codelTargetDelay; 060 private volatile int codelInterval; 061 062 // if queue if full more than that percent, we switch to LIFO mode. 063 // Values are in the range of 0.7, 0.8 etc (0-1.0). 064 private volatile double lifoThreshold; 065 066 // minimal delay observed during the interval 067 private volatile long minDelay; 068 069 // the moment when current interval ends 070 private volatile long intervalTime = EnvironmentEdgeManager.currentTime(); 071 072 // switch to ensure only one threads does interval cutoffs 073 private AtomicBoolean resetDelay = new AtomicBoolean(true); 074 075 // if we're in this mode, "long" calls are getting dropped 076 private AtomicBoolean isOverloaded = new AtomicBoolean(false); 077 078 public AdaptiveLifoCoDelCallQueue(int capacity, int targetDelay, int interval, 079 double lifoThreshold, LongAdder numGeneralCallsDropped, LongAdder numLifoModeSwitches) { 080 this.maxCapacity = capacity; 081 this.queue = new LinkedBlockingDeque<>(capacity); 082 this.codelTargetDelay = targetDelay; 083 this.codelInterval = interval; 084 this.lifoThreshold = lifoThreshold; 085 this.numGeneralCallsDropped = numGeneralCallsDropped; 086 this.numLifoModeSwitches = numLifoModeSwitches; 087 } 088 089 /** 090 * Update tunables. 091 * 092 * @param newCodelTargetDelay new CoDel target delay 093 * @param newCodelInterval new CoDel interval 094 * @param newLifoThreshold new Adaptive Lifo threshold 095 */ 096 public void updateTunables(int newCodelTargetDelay, int newCodelInterval, 097 double newLifoThreshold) { 098 this.codelTargetDelay = newCodelTargetDelay; 099 this.codelInterval = newCodelInterval; 100 this.lifoThreshold = newLifoThreshold; 101 } 102 103 /** 104 * Behaves as {@link LinkedBlockingQueue#take()}, except it will silently 105 * skip all calls which it thinks should be dropped. 106 * 107 * @return the head of this queue 108 * @throws InterruptedException if interrupted while waiting 109 */ 110 @Override 111 public CallRunner take() throws InterruptedException { 112 CallRunner cr; 113 while(true) { 114 if (((double) queue.size() / this.maxCapacity) > lifoThreshold) { 115 numLifoModeSwitches.increment(); 116 cr = queue.takeLast(); 117 } else { 118 cr = queue.takeFirst(); 119 } 120 if (needToDrop(cr)) { 121 numGeneralCallsDropped.increment(); 122 cr.drop(); 123 } else { 124 return cr; 125 } 126 } 127 } 128 129 @Override 130 public CallRunner poll() { 131 CallRunner cr; 132 boolean switched = false; 133 while(true) { 134 if (((double) queue.size() / this.maxCapacity) > lifoThreshold) { 135 // Only count once per switch. 136 if (!switched) { 137 switched = true; 138 numLifoModeSwitches.increment(); 139 } 140 cr = queue.pollLast(); 141 } else { 142 switched = false; 143 cr = queue.pollFirst(); 144 } 145 if (cr == null) { 146 return cr; 147 } 148 if (needToDrop(cr)) { 149 numGeneralCallsDropped.increment(); 150 cr.drop(); 151 } else { 152 return cr; 153 } 154 } 155 } 156 157 /** 158 * @param callRunner to validate 159 * @return true if this call needs to be skipped based on call timestamp 160 * and internal queue state (deemed overloaded). 161 */ 162 private boolean needToDrop(CallRunner callRunner) { 163 long now = EnvironmentEdgeManager.currentTime(); 164 long callDelay = now - callRunner.getRpcCall().getReceiveTime(); 165 166 long localMinDelay = this.minDelay; 167 168 // Try and determine if we should reset 169 // the delay time and determine overload 170 if (now > intervalTime && 171 !resetDelay.get() && 172 !resetDelay.getAndSet(true)) { 173 intervalTime = now + codelInterval; 174 175 isOverloaded.set(localMinDelay > codelTargetDelay); 176 } 177 178 // If it looks like we should reset the delay 179 // time do it only once on one thread 180 if (resetDelay.get() && resetDelay.getAndSet(false)) { 181 minDelay = callDelay; 182 // we just reset the delay dunno about how this will work 183 return false; 184 } else if (callDelay < localMinDelay) { 185 minDelay = callDelay; 186 } 187 188 return isOverloaded.get() && callDelay > 2 * codelTargetDelay; 189 } 190 191 // Generic BlockingQueue methods we support 192 @Override 193 public boolean offer(CallRunner callRunner) { 194 return queue.offer(callRunner); 195 } 196 197 @Override 198 public int size() { 199 return queue.size(); 200 } 201 202 @Override 203 public String toString() { 204 return queue.toString(); 205 } 206 207 // This class does NOT provide generic purpose BlockingQueue implementation, 208 // so to prevent misuse all other methods throw UnsupportedOperationException. 209 210 @Override 211 public CallRunner poll(long timeout, TimeUnit unit) throws InterruptedException { 212 throw new UnsupportedOperationException("This class doesn't support anything," 213 + " but take() and offer() methods"); 214 } 215 216 217 @Override 218 public CallRunner peek() { 219 throw new UnsupportedOperationException("This class doesn't support anything," 220 + " but take() and offer() methods"); 221 } 222 223 @Override 224 public boolean remove(Object o) { 225 throw new UnsupportedOperationException("This class doesn't support anything," 226 + " but take() and offer() methods"); 227 } 228 229 @Override 230 public boolean contains(Object o) { 231 throw new UnsupportedOperationException("This class doesn't support anything," 232 + " but take() and offer() methods"); 233 } 234 235 @Override 236 public Object[] toArray() { 237 throw new UnsupportedOperationException("This class doesn't support anything," 238 + " but take() and offer() methods"); 239 } 240 241 @Override 242 public <T> T[] toArray(T[] a) { 243 throw new UnsupportedOperationException("This class doesn't support anything," 244 + " but take() and offer() methods"); 245 } 246 247 @Override 248 public void clear() { 249 throw new UnsupportedOperationException("This class doesn't support anything," 250 + " but take() and offer() methods"); 251 } 252 253 @Override 254 public int drainTo(Collection<? super CallRunner> c) { 255 throw new UnsupportedOperationException("This class doesn't support anything," 256 + " but take() and offer() methods"); 257 } 258 259 @Override 260 public int drainTo(Collection<? super CallRunner> c, int maxElements) { 261 throw new UnsupportedOperationException("This class doesn't support anything," 262 + " but take() and offer() methods"); 263 } 264 265 @Override 266 public Iterator<CallRunner> iterator() { 267 throw new UnsupportedOperationException("This class doesn't support anything," 268 + " but take() and offer() methods"); 269 } 270 271 @Override 272 public boolean add(CallRunner callRunner) { 273 throw new UnsupportedOperationException("This class doesn't support anything," 274 + " but take() and offer() methods"); 275 } 276 277 @Override 278 public CallRunner remove() { 279 throw new UnsupportedOperationException("This class doesn't support anything," 280 + " but take() and offer() methods"); 281 } 282 283 @Override 284 public CallRunner element() { 285 throw new UnsupportedOperationException("This class doesn't support anything," 286 + " but take() and offer() methods"); 287 } 288 289 @Override 290 public boolean addAll(Collection<? extends CallRunner> c) { 291 throw new UnsupportedOperationException("This class doesn't support anything," 292 + " but take() and offer() methods"); 293 } 294 295 @Override 296 public boolean isEmpty() { 297 throw new UnsupportedOperationException("This class doesn't support anything," 298 + " but take() and offer() methods"); 299 } 300 301 @Override 302 public boolean containsAll(Collection<?> c) { 303 throw new UnsupportedOperationException("This class doesn't support anything," 304 + " but take() and offer() methods"); 305 } 306 307 @Override 308 public boolean removeAll(Collection<?> c) { 309 throw new UnsupportedOperationException("This class doesn't support anything," 310 + " but take() and offer() methods"); 311 } 312 313 @Override 314 public boolean retainAll(Collection<?> c) { 315 throw new UnsupportedOperationException("This class doesn't support anything," 316 + " but take() and offer() methods"); 317 } 318 319 @Override 320 public int remainingCapacity() { 321 throw new UnsupportedOperationException("This class doesn't support anything," 322 + " but take() and offer() methods"); 323 } 324 325 @Override 326 public void put(CallRunner callRunner) throws InterruptedException { 327 throw new UnsupportedOperationException("This class doesn't support anything," 328 + " but take() and offer() methods"); 329 } 330 331 @Override 332 public boolean offer(CallRunner callRunner, long timeout, TimeUnit unit) 333 throws InterruptedException { 334 throw new UnsupportedOperationException("This class doesn't support anything," 335 + " but take() and offer() methods"); 336 } 337}