001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.procedure2; 020 021import java.util.Iterator; 022import java.util.concurrent.locks.Condition; 023import java.util.concurrent.locks.ReentrantLock; 024import java.util.concurrent.TimeUnit; 025 026import org.apache.yetus.audience.InterfaceAudience; 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029 030@InterfaceAudience.Private 031public abstract class AbstractProcedureScheduler implements ProcedureScheduler { 032 private static final Logger LOG = LoggerFactory.getLogger(AbstractProcedureScheduler.class); 033 private final ReentrantLock schedulerLock = new ReentrantLock(); 034 private final Condition schedWaitCond = schedulerLock.newCondition(); 035 private boolean running = false; 036 037 // TODO: metrics 038 private long pollCalls = 0; 039 private long nullPollCalls = 0; 040 041 @Override 042 public void start() { 043 schedLock(); 044 try { 045 running = true; 046 } finally { 047 schedUnlock(); 048 } 049 } 050 051 @Override 052 public void stop() { 053 schedLock(); 054 try { 055 running = false; 056 schedWaitCond.signalAll(); 057 } finally { 058 schedUnlock(); 059 } 060 } 061 062 @Override 063 public void signalAll() { 064 schedLock(); 065 try { 066 schedWaitCond.signalAll(); 067 } finally { 068 schedUnlock(); 069 } 070 } 071 072 // ========================================================================== 073 // Add related 074 // ========================================================================== 075 /** 076 * Add the procedure to the queue. 077 * NOTE: this method is called with the sched lock held. 078 * @param procedure the Procedure to add 079 * @param addFront true if the item should be added to the front of the queue 080 */ 081 protected abstract void enqueue(Procedure procedure, boolean addFront); 082 083 @Override 084 public void addFront(final Procedure procedure) { 085 push(procedure, true, true); 086 } 087 088 @Override 089 public void addFront(final Procedure procedure, boolean notify) { 090 push(procedure, true, notify); 091 } 092 093 @Override 094 public void addFront(Iterator<Procedure> procedureIterator) { 095 schedLock(); 096 try { 097 int count = 0; 098 while (procedureIterator.hasNext()) { 099 Procedure procedure = procedureIterator.next(); 100 if (LOG.isTraceEnabled()) { 101 LOG.trace("Wake " + procedure); 102 } 103 push(procedure, /* addFront= */ true, /* notify= */false); 104 count++; 105 } 106 wakePollIfNeeded(count); 107 } finally { 108 schedUnlock(); 109 } 110 } 111 112 @Override 113 public void addBack(final Procedure procedure) { 114 push(procedure, false, true); 115 } 116 117 @Override 118 public void addBack(final Procedure procedure, boolean notify) { 119 push(procedure, false, notify); 120 } 121 122 protected void push(final Procedure procedure, final boolean addFront, final boolean notify) { 123 schedLock(); 124 try { 125 enqueue(procedure, addFront); 126 if (notify) { 127 schedWaitCond.signalAll(); 128 } 129 } finally { 130 schedUnlock(); 131 } 132 } 133 134 // ========================================================================== 135 // Poll related 136 // ========================================================================== 137 /** 138 * Fetch one Procedure from the queue 139 * NOTE: this method is called with the sched lock held. 140 * @return the Procedure to execute, or null if nothing is available. 141 */ 142 protected Procedure dequeue() { 143 return dequeue(false); 144 } 145 146 protected abstract Procedure dequeue(boolean onlyUrgent); 147 148 149 @Override 150 public Procedure poll(boolean onlyUrgent) { 151 return poll(onlyUrgent, -1); 152 } 153 154 @Override 155 public Procedure poll() { 156 return poll(false, -1); 157 } 158 159 @Override 160 public Procedure poll(boolean onlyUrgent, long timeout, TimeUnit unit) { 161 return poll(onlyUrgent, unit.toNanos(timeout)); 162 } 163 164 @Override 165 public Procedure poll(long timeout, TimeUnit unit) { 166 return poll(false, unit.toNanos(timeout)); 167 } 168 169 public Procedure poll(final long nanos) { 170 return poll(false, nanos); 171 } 172 173 @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") 174 public Procedure poll(final boolean onlyUrgent, final long nanos) { 175 schedLock(); 176 try { 177 if (!running) { 178 LOG.debug("the scheduler is not running"); 179 return null; 180 } 181 182 if (!queueHasRunnables()) { 183 // WA_AWAIT_NOT_IN_LOOP: we are not in a loop because we want the caller 184 // to take decisions after a wake/interruption. 185 if (nanos < 0) { 186 schedWaitCond.await(); 187 } else { 188 schedWaitCond.awaitNanos(nanos); 189 } 190 if (!queueHasRunnables()) { 191 nullPollCalls++; 192 return null; 193 } 194 } 195 final Procedure pollResult = dequeue(onlyUrgent); 196 197 pollCalls++; 198 nullPollCalls += (pollResult == null) ? 1 : 0; 199 return pollResult; 200 } catch (InterruptedException e) { 201 Thread.currentThread().interrupt(); 202 nullPollCalls++; 203 return null; 204 } finally { 205 schedUnlock(); 206 } 207 } 208 209 // ========================================================================== 210 // Utils 211 // ========================================================================== 212 /** 213 * Returns the number of elements in this queue. 214 * NOTE: this method is called with the sched lock held. 215 * @return the number of elements in this queue. 216 */ 217 protected abstract int queueSize(); 218 219 /** 220 * Returns true if there are procedures available to process. 221 * NOTE: this method is called with the sched lock held. 222 * @return true if there are procedures available to process, otherwise false. 223 */ 224 protected abstract boolean queueHasRunnables(); 225 226 @Override 227 public int size() { 228 schedLock(); 229 try { 230 return queueSize(); 231 } finally { 232 schedUnlock(); 233 } 234 } 235 236 @Override 237 public boolean hasRunnables() { 238 schedLock(); 239 try { 240 return queueHasRunnables(); 241 } finally { 242 schedUnlock(); 243 } 244 } 245 246 // ============================================================================ 247 // TODO: Metrics 248 // ============================================================================ 249 public long getPollCalls() { 250 return pollCalls; 251 } 252 253 public long getNullPollCalls() { 254 return nullPollCalls; 255 } 256 257 // ========================================================================== 258 // Procedure Events 259 // ========================================================================== 260 261 /** 262 * Wake up all of the given events. 263 * Note that we first take scheduler lock and then wakeInternal() synchronizes on the event. 264 * Access should remain package-private. Use ProcedureEvent class to wake/suspend events. 265 * @param events the list of events to wake 266 */ 267 void wakeEvents(ProcedureEvent[] events) { 268 schedLock(); 269 try { 270 for (ProcedureEvent event : events) { 271 if (event == null) { 272 continue; 273 } 274 event.wakeInternal(this); 275 } 276 } finally { 277 schedUnlock(); 278 } 279 } 280 281 /** 282 * Wakes up given waiting procedures by pushing them back into scheduler queues. 283 * @return size of given {@code waitQueue}. 284 */ 285 protected int wakeWaitingProcedures(LockAndQueue lockAndQueue) { 286 return lockAndQueue.wakeWaitingProcedures(this); 287 } 288 289 protected void waitProcedure(LockAndQueue lockAndQueue, final Procedure proc) { 290 lockAndQueue.addLast(proc); 291 } 292 293 protected void wakeProcedure(final Procedure procedure) { 294 LOG.trace("Wake {}", procedure); 295 push(procedure, /* addFront= */ true, /* notify= */false); 296 } 297 298 299 // ========================================================================== 300 // Internal helpers 301 // ========================================================================== 302 protected void schedLock() { 303 schedulerLock.lock(); 304 } 305 306 protected void schedUnlock() { 307 schedulerLock.unlock(); 308 } 309 310 protected void wakePollIfNeeded(final int waitingCount) { 311 if (waitingCount <= 0) { 312 return; 313 } 314 schedWaitCond.signalAll(); 315 } 316}