001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import java.util.Iterator; 021import java.util.concurrent.TimeUnit; 022import java.util.concurrent.locks.Condition; 023import java.util.concurrent.locks.ReentrantLock; 024 025import org.apache.yetus.audience.InterfaceAudience; 026import org.slf4j.Logger; 027import org.slf4j.LoggerFactory; 028 029@InterfaceAudience.Private 030public abstract class AbstractProcedureScheduler implements ProcedureScheduler { 031 private static final Logger LOG = LoggerFactory.getLogger(AbstractProcedureScheduler.class); 032 private final ReentrantLock schedulerLock = new ReentrantLock(); 033 private final Condition schedWaitCond = schedulerLock.newCondition(); 034 private boolean running = false; 035 036 // TODO: metrics 037 private long pollCalls = 0; 038 private long nullPollCalls = 0; 039 040 @Override 041 public void start() { 042 schedLock(); 043 try { 044 running = true; 045 } finally { 046 schedUnlock(); 047 } 048 } 049 050 @Override 051 public void stop() { 052 schedLock(); 053 try { 054 running = false; 055 schedWaitCond.signalAll(); 056 } finally { 057 schedUnlock(); 058 } 059 } 060 061 @Override 062 public void signalAll() { 063 schedLock(); 064 try { 065 schedWaitCond.signalAll(); 066 } finally { 067 schedUnlock(); 068 } 069 } 070 071 // ========================================================================== 072 // Add related 073 // ========================================================================== 074 /** 075 * Add the procedure to the queue. 076 * NOTE: this method is called with the sched lock held. 077 * @param procedure the Procedure to add 078 * @param addFront true if the item should be added to the front of the queue 079 */ 080 protected abstract void enqueue(Procedure procedure, boolean addFront); 081 082 @Override 083 public void addFront(final Procedure procedure) { 084 push(procedure, true, true); 085 } 086 087 @Override 088 public void addFront(final Procedure procedure, boolean notify) { 089 push(procedure, true, notify); 090 } 091 092 @Override 093 public void addFront(Iterator<Procedure> procedureIterator) { 094 schedLock(); 095 try { 096 int count = 0; 097 while (procedureIterator.hasNext()) { 098 Procedure procedure = procedureIterator.next(); 099 if (LOG.isTraceEnabled()) { 100 LOG.trace("Wake " + procedure); 101 } 102 push(procedure, /* addFront= */ true, /* notify= */false); 103 count++; 104 } 105 wakePollIfNeeded(count); 106 } finally { 107 schedUnlock(); 108 } 109 } 110 111 @Override 112 public void addBack(final Procedure procedure) { 113 push(procedure, false, true); 114 } 115 116 @Override 117 public void addBack(final Procedure procedure, boolean notify) { 118 push(procedure, false, notify); 119 } 120 121 protected void push(final Procedure procedure, final boolean addFront, final boolean notify) { 122 schedLock(); 123 try { 124 enqueue(procedure, addFront); 125 if (notify) { 126 schedWaitCond.signal(); 127 } 128 } finally { 129 schedUnlock(); 130 } 131 } 132 133 // ========================================================================== 134 // Poll related 135 // ========================================================================== 136 /** 137 * Fetch one Procedure from the queue 138 * NOTE: this method is called with the sched lock held. 139 * @return the Procedure to execute, or null if nothing is available. 140 */ 141 protected abstract Procedure dequeue(); 142 143 @Override 144 public Procedure poll() { 145 return poll(-1); 146 } 147 148 @Override 149 public Procedure poll(long timeout, TimeUnit unit) { 150 return poll(unit.toNanos(timeout)); 151 } 152 153 @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") 154 public Procedure poll(final long nanos) { 155 schedLock(); 156 try { 157 if (!running) { 158 LOG.debug("the scheduler is not running"); 159 return null; 160 } 161 162 if (!queueHasRunnables()) { 163 // WA_AWAIT_NOT_IN_LOOP: we are not in a loop because we want the caller 164 // to take decisions after a wake/interruption. 165 if (nanos < 0) { 166 schedWaitCond.await(); 167 } else { 168 schedWaitCond.awaitNanos(nanos); 169 } 170 if (!queueHasRunnables()) { 171 nullPollCalls++; 172 return null; 173 } 174 } 175 final Procedure pollResult = dequeue(); 176 177 pollCalls++; 178 nullPollCalls += (pollResult == null) ? 1 : 0; 179 return pollResult; 180 } catch (InterruptedException e) { 181 Thread.currentThread().interrupt(); 182 nullPollCalls++; 183 return null; 184 } finally { 185 schedUnlock(); 186 } 187 } 188 189 // ========================================================================== 190 // Utils 191 // ========================================================================== 192 /** 193 * Returns the number of elements in this queue. 194 * NOTE: this method is called with the sched lock held. 195 * @return the number of elements in this queue. 196 */ 197 protected abstract int queueSize(); 198 199 /** 200 * Returns true if there are procedures available to process. 201 * NOTE: this method is called with the sched lock held. 202 * @return true if there are procedures available to process, otherwise false. 203 */ 204 protected abstract boolean queueHasRunnables(); 205 206 @Override 207 public int size() { 208 schedLock(); 209 try { 210 return queueSize(); 211 } finally { 212 schedUnlock(); 213 } 214 } 215 216 @Override 217 public boolean hasRunnables() { 218 schedLock(); 219 try { 220 return queueHasRunnables(); 221 } finally { 222 schedUnlock(); 223 } 224 } 225 226 // ============================================================================ 227 // TODO: Metrics 228 // ============================================================================ 229 public long getPollCalls() { 230 return pollCalls; 231 } 232 233 public long getNullPollCalls() { 234 return nullPollCalls; 235 } 236 237 // ========================================================================== 238 // Procedure Events 239 // ========================================================================== 240 241 /** 242 * Wake up all of the given events. 243 * Note that we first take scheduler lock and then wakeInternal() synchronizes on the event. 244 * Access should remain package-private. Use ProcedureEvent class to wake/suspend events. 245 * @param events the list of events to wake 246 */ 247 void wakeEvents(ProcedureEvent[] events) { 248 schedLock(); 249 try { 250 for (ProcedureEvent event : events) { 251 if (event == null) { 252 continue; 253 } 254 event.wakeInternal(this); 255 } 256 } finally { 257 schedUnlock(); 258 } 259 } 260 261 /** 262 * Wakes up given waiting procedures by pushing them back into scheduler queues. 263 * @return size of given {@code waitQueue}. 264 */ 265 protected int wakeWaitingProcedures(LockAndQueue lockAndQueue) { 266 return lockAndQueue.wakeWaitingProcedures(this); 267 } 268 269 protected void waitProcedure(LockAndQueue lockAndQueue, final Procedure proc) { 270 lockAndQueue.addLast(proc); 271 } 272 273 protected void wakeProcedure(final Procedure procedure) { 274 LOG.trace("Wake {}", procedure); 275 push(procedure, /* addFront= */ true, /* notify= */false); 276 } 277 278 // ========================================================================== 279 // Internal helpers 280 // ========================================================================== 281 protected void schedLock() { 282 schedulerLock.lock(); 283 } 284 285 protected void schedUnlock() { 286 schedulerLock.unlock(); 287 } 288 289 protected void wakePollIfNeeded(final int waitingCount) { 290 if (waitingCount <= 0) { 291 return; 292 } 293 if (waitingCount == 1) { 294 schedWaitCond.signal(); 295 } else { 296 schedWaitCond.signalAll(); 297 } 298 } 299}