001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.util.Collection; 021import java.util.Iterator; 022import java.util.concurrent.BlockingQueue; 023import java.util.concurrent.LinkedBlockingDeque; 024import java.util.concurrent.LinkedBlockingQueue; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.atomic.AtomicBoolean; 027import java.util.concurrent.atomic.LongAdder; 028import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 029import org.apache.yetus.audience.InterfaceAudience; 030 031/** 032 * Adaptive LIFO blocking queue utilizing CoDel algorithm to prevent queue overloading. Implementing 033 * {@link BlockingQueue} interface to be compatible with {@link RpcExecutor}. Currently uses 034 * milliseconds internally, need to look into whether we should use nanoseconds for timeInterval and 035 * minDelay. 036 * @see <a href="http://queue.acm.org/detail.cfm?id=2839461">Fail at Scale paper</a> 037 * @see <a href="https://github.com/facebook/wangle/blob/master/wangle/concurrent/Codel.cpp"> CoDel 038 * version for generic job queues in Wangle library</a> 039 */ 040@InterfaceAudience.Private 041public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> { 042 043 // backing queue 044 private LinkedBlockingDeque<CallRunner> queue; 045 046 // so we can calculate actual threshold to switch to LIFO under load 047 private int maxCapacity; 048 049 // metrics (shared across all queues) 050 private LongAdder numGeneralCallsDropped; 051 private LongAdder numLifoModeSwitches; 052 053 // Both are in milliseconds 054 private volatile int codelTargetDelay; 055 private volatile int codelInterval; 056 057 // if queue if full more than that percent, we switch to LIFO mode. 058 // Values are in the range of 0.7, 0.8 etc (0-1.0). 059 private volatile double lifoThreshold; 060 061 // minimal delay observed during the interval 062 private volatile long minDelay; 063 064 // the moment when current interval ends 065 private volatile long intervalTime = EnvironmentEdgeManager.currentTime(); 066 067 // switch to ensure only one threads does interval cutoffs 068 private AtomicBoolean resetDelay = new AtomicBoolean(true); 069 070 // if we're in this mode, "long" calls are getting dropped 071 private AtomicBoolean isOverloaded = new AtomicBoolean(false); 072 073 public AdaptiveLifoCoDelCallQueue(int capacity, int targetDelay, int interval, 074 double lifoThreshold, LongAdder numGeneralCallsDropped, LongAdder numLifoModeSwitches) { 075 this.maxCapacity = capacity; 076 this.queue = new LinkedBlockingDeque<>(capacity); 077 this.codelTargetDelay = targetDelay; 078 this.codelInterval = interval; 079 this.lifoThreshold = lifoThreshold; 080 this.numGeneralCallsDropped = numGeneralCallsDropped; 081 this.numLifoModeSwitches = numLifoModeSwitches; 082 } 083 084 /** 085 * Update tunables. 086 * @param newCodelTargetDelay new CoDel target delay 087 * @param newCodelInterval new CoDel interval 088 * @param newLifoThreshold new Adaptive Lifo threshold 089 */ 090 public void updateTunables(int newCodelTargetDelay, int newCodelInterval, 091 double newLifoThreshold) { 092 this.codelTargetDelay = newCodelTargetDelay; 093 this.codelInterval = newCodelInterval; 094 this.lifoThreshold = newLifoThreshold; 095 } 096 097 /** 098 * Behaves as {@link LinkedBlockingQueue#take()}, except it will silently skip all calls which it 099 * thinks should be dropped. 100 * @return the head of this queue 101 * @throws InterruptedException if interrupted while waiting 102 */ 103 @Override 104 public CallRunner take() throws InterruptedException { 105 CallRunner cr; 106 while (true) { 107 if (((double) queue.size() / this.maxCapacity) > lifoThreshold) { 108 numLifoModeSwitches.increment(); 109 cr = queue.takeLast(); 110 } else { 111 cr = queue.takeFirst(); 112 } 113 if (needToDrop(cr)) { 114 numGeneralCallsDropped.increment(); 115 cr.drop(); 116 } else { 117 return cr; 118 } 119 } 120 } 121 122 @Override 123 public CallRunner poll() { 124 CallRunner cr; 125 boolean switched = false; 126 while (true) { 127 if (((double) queue.size() / this.maxCapacity) > lifoThreshold) { 128 // Only count once per switch. 129 if (!switched) { 130 switched = true; 131 numLifoModeSwitches.increment(); 132 } 133 cr = queue.pollLast(); 134 } else { 135 switched = false; 136 cr = queue.pollFirst(); 137 } 138 if (cr == null) { 139 return cr; 140 } 141 if (needToDrop(cr)) { 142 numGeneralCallsDropped.increment(); 143 cr.drop(); 144 } else { 145 return cr; 146 } 147 } 148 } 149 150 /** 151 * @param callRunner to validate 152 * @return true if this call needs to be skipped based on call timestamp and internal queue state 153 * (deemed overloaded). 154 */ 155 private boolean needToDrop(CallRunner callRunner) { 156 long now = EnvironmentEdgeManager.currentTime(); 157 long callDelay = now - callRunner.getRpcCall().getReceiveTime(); 158 159 long localMinDelay = this.minDelay; 160 161 // Try and determine if we should reset 162 // the delay time and determine overload 163 if (now > intervalTime && !resetDelay.get() && !resetDelay.getAndSet(true)) { 164 intervalTime = now + codelInterval; 165 166 isOverloaded.set(localMinDelay > codelTargetDelay); 167 } 168 169 // If it looks like we should reset the delay 170 // time do it only once on one thread 171 if (resetDelay.get() && resetDelay.getAndSet(false)) { 172 minDelay = callDelay; 173 // we just reset the delay dunno about how this will work 174 return false; 175 } else if (callDelay < localMinDelay) { 176 minDelay = callDelay; 177 } 178 179 return isOverloaded.get() && callDelay > 2 * codelTargetDelay; 180 } 181 182 // Generic BlockingQueue methods we support 183 @Override 184 public boolean offer(CallRunner callRunner) { 185 return queue.offer(callRunner); 186 } 187 188 @Override 189 public int size() { 190 return queue.size(); 191 } 192 193 @Override 194 public String toString() { 195 return queue.toString(); 196 } 197 198 // This class does NOT provide generic purpose BlockingQueue implementation, 199 // so to prevent misuse all other methods throw UnsupportedOperationException. 200 201 @Override 202 public CallRunner poll(long timeout, TimeUnit unit) throws InterruptedException { 203 throw new UnsupportedOperationException( 204 "This class doesn't support anything," + " but take() and offer() methods"); 205 } 206 207 @Override 208 public CallRunner peek() { 209 throw new UnsupportedOperationException( 210 "This class doesn't support anything," + " but take() and offer() methods"); 211 } 212 213 @Override 214 public boolean remove(Object o) { 215 throw new UnsupportedOperationException( 216 "This class doesn't support anything," + " but take() and offer() methods"); 217 } 218 219 @Override 220 public boolean contains(Object o) { 221 throw new UnsupportedOperationException( 222 "This class doesn't support anything," + " but take() and offer() methods"); 223 } 224 225 @Override 226 public Object[] toArray() { 227 throw new UnsupportedOperationException( 228 "This class doesn't support anything," + " but take() and offer() methods"); 229 } 230 231 @Override 232 public <T> T[] toArray(T[] a) { 233 throw new UnsupportedOperationException( 234 "This class doesn't support anything," + " but take() and offer() methods"); 235 } 236 237 @Override 238 public void clear() { 239 throw new UnsupportedOperationException( 240 "This class doesn't support anything," + " but take() and offer() methods"); 241 } 242 243 @Override 244 public int drainTo(Collection<? super CallRunner> c) { 245 throw new UnsupportedOperationException( 246 "This class doesn't support anything," + " but take() and offer() methods"); 247 } 248 249 @Override 250 public int drainTo(Collection<? super CallRunner> c, int maxElements) { 251 throw new UnsupportedOperationException( 252 "This class doesn't support anything," + " but take() and offer() methods"); 253 } 254 255 @Override 256 public Iterator<CallRunner> iterator() { 257 throw new UnsupportedOperationException( 258 "This class doesn't support anything," + " but take() and offer() methods"); 259 } 260 261 @Override 262 public boolean add(CallRunner callRunner) { 263 throw new UnsupportedOperationException( 264 "This class doesn't support anything," + " but take() and offer() methods"); 265 } 266 267 @Override 268 public CallRunner remove() { 269 throw new UnsupportedOperationException( 270 "This class doesn't support anything," + " but take() and offer() methods"); 271 } 272 273 @Override 274 public CallRunner element() { 275 throw new UnsupportedOperationException( 276 "This class doesn't support anything," + " but take() and offer() methods"); 277 } 278 279 @Override 280 public boolean addAll(Collection<? extends CallRunner> c) { 281 throw new UnsupportedOperationException( 282 "This class doesn't support anything," + " but take() and offer() methods"); 283 } 284 285 @Override 286 public boolean isEmpty() { 287 throw new UnsupportedOperationException( 288 "This class doesn't support anything," + " but take() and offer() methods"); 289 } 290 291 @Override 292 public boolean containsAll(Collection<?> c) { 293 throw new UnsupportedOperationException( 294 "This class doesn't support anything," + " but take() and offer() methods"); 295 } 296 297 @Override 298 public boolean removeAll(Collection<?> c) { 299 throw new UnsupportedOperationException( 300 "This class doesn't support anything," + " but take() and offer() methods"); 301 } 302 303 @Override 304 public boolean retainAll(Collection<?> c) { 305 throw new UnsupportedOperationException( 306 "This class doesn't support anything," + " but take() and offer() methods"); 307 } 308 309 @Override 310 public int remainingCapacity() { 311 throw new UnsupportedOperationException( 312 "This class doesn't support anything," + " but take() and offer() methods"); 313 } 314 315 @Override 316 public void put(CallRunner callRunner) throws InterruptedException { 317 throw new UnsupportedOperationException( 318 "This class doesn't support anything," + " but take() and offer() methods"); 319 } 320 321 @Override 322 public boolean offer(CallRunner callRunner, long timeout, TimeUnit unit) 323 throws InterruptedException { 324 throw new UnsupportedOperationException( 325 "This class doesn't support anything," + " but take() and offer() methods"); 326 } 327}