001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import java.util.Iterator; 021import java.util.concurrent.TimeUnit; 022import java.util.concurrent.locks.Condition; 023import java.util.concurrent.locks.ReentrantLock; 024import org.apache.yetus.audience.InterfaceAudience; 025import org.slf4j.Logger; 026import org.slf4j.LoggerFactory; 027 028@InterfaceAudience.Private 029public abstract class AbstractProcedureScheduler implements ProcedureScheduler { 030 private static final Logger LOG = LoggerFactory.getLogger(AbstractProcedureScheduler.class); 031 private final ReentrantLock schedulerLock = new ReentrantLock(); 032 private final Condition schedWaitCond = schedulerLock.newCondition(); 033 private boolean running = false; 034 035 // TODO: metrics 036 private long pollCalls = 0; 037 private long nullPollCalls = 0; 038 039 @Override 040 public void start() { 041 schedLock(); 042 try { 043 running = true; 044 } finally { 045 schedUnlock(); 046 } 047 } 048 049 @Override 050 public void stop() { 051 schedLock(); 052 try { 053 running = false; 054 schedWaitCond.signalAll(); 055 } finally { 056 schedUnlock(); 057 } 058 } 059 060 @Override 061 public void signalAll() { 062 schedLock(); 063 try { 064 schedWaitCond.signalAll(); 065 } finally { 066 schedUnlock(); 067 } 068 } 069 070 // ========================================================================== 071 // Add related 072 // ========================================================================== 073 /** 074 * Add the procedure to the queue. NOTE: this method is called with the sched lock held. 075 * @param procedure the Procedure to add 076 * @param addFront true if the item should be added to the front of the queue 077 */ 078 protected abstract void enqueue(Procedure procedure, boolean addFront); 079 080 @Override 081 public void addFront(final Procedure procedure) { 082 push(procedure, true, true); 083 } 084 085 @Override 086 public void addFront(final Procedure procedure, boolean notify) { 087 push(procedure, true, notify); 088 } 089 090 @Override 091 public void addFront(Iterator<Procedure> procedureIterator) { 092 schedLock(); 093 try { 094 int count = 0; 095 while (procedureIterator.hasNext()) { 096 Procedure procedure = procedureIterator.next(); 097 if (LOG.isTraceEnabled()) { 098 LOG.trace("Wake " + procedure); 099 } 100 push(procedure, /* addFront= */ true, /* notify= */false); 101 count++; 102 } 103 wakePollIfNeeded(count); 104 } finally { 105 schedUnlock(); 106 } 107 } 108 109 @Override 110 public void addBack(final Procedure procedure) { 111 push(procedure, false, true); 112 } 113 114 @Override 115 public void addBack(final Procedure procedure, boolean notify) { 116 push(procedure, false, notify); 117 } 118 119 protected void push(final Procedure procedure, final boolean addFront, final boolean notify) { 120 schedLock(); 121 try { 122 enqueue(procedure, addFront); 123 if (notify) { 124 schedWaitCond.signal(); 125 } 126 } finally { 127 schedUnlock(); 128 } 129 } 130 131 // ========================================================================== 132 // Poll related 133 // ========================================================================== 134 /** 135 * Fetch one Procedure from the queue NOTE: this method is called with the sched lock held. 136 * @return the Procedure to execute, or null if nothing is available. 137 */ 138 protected abstract Procedure dequeue(); 139 140 @Override 141 public Procedure poll() { 142 return poll(-1); 143 } 144 145 @Override 146 public Procedure poll(long timeout, TimeUnit unit) { 147 return poll(unit.toNanos(timeout)); 148 } 149 150 @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") 151 public Procedure poll(final long nanos) { 152 schedLock(); 153 try { 154 if (!running) { 155 LOG.debug("the scheduler is not running"); 156 return null; 157 } 158 159 if (!queueHasRunnables()) { 160 // WA_AWAIT_NOT_IN_LOOP: we are not in a loop because we want the caller 161 // to take decisions after a wake/interruption. 162 if (nanos < 0) { 163 schedWaitCond.await(); 164 } else { 165 schedWaitCond.awaitNanos(nanos); 166 } 167 if (!queueHasRunnables()) { 168 nullPollCalls++; 169 return null; 170 } 171 } 172 final Procedure pollResult = dequeue(); 173 174 pollCalls++; 175 nullPollCalls += (pollResult == null) ? 1 : 0; 176 return pollResult; 177 } catch (InterruptedException e) { 178 Thread.currentThread().interrupt(); 179 nullPollCalls++; 180 return null; 181 } finally { 182 schedUnlock(); 183 } 184 } 185 186 // ========================================================================== 187 // Utils 188 // ========================================================================== 189 /** 190 * Returns the number of elements in this queue. NOTE: this method is called with the sched lock 191 * held. 192 * @return the number of elements in this queue. 193 */ 194 protected abstract int queueSize(); 195 196 /** 197 * Returns true if there are procedures available to process. NOTE: this method is called with the 198 * sched lock held. 199 * @return true if there are procedures available to process, otherwise false. 200 */ 201 protected abstract boolean queueHasRunnables(); 202 203 @Override 204 public int size() { 205 schedLock(); 206 try { 207 return queueSize(); 208 } finally { 209 schedUnlock(); 210 } 211 } 212 213 @Override 214 public boolean hasRunnables() { 215 schedLock(); 216 try { 217 return queueHasRunnables(); 218 } finally { 219 schedUnlock(); 220 } 221 } 222 223 // ============================================================================ 224 // TODO: Metrics 225 // ============================================================================ 226 public long getPollCalls() { 227 return pollCalls; 228 } 229 230 public long getNullPollCalls() { 231 return nullPollCalls; 232 } 233 234 // ========================================================================== 235 // Procedure Events 236 // ========================================================================== 237 238 /** 239 * Wake up all of the given events. Note that we first take scheduler lock and then wakeInternal() 240 * synchronizes on the event. Access should remain package-private. Use ProcedureEvent class to 241 * wake/suspend events. 242 * @param events the list of events to wake 243 */ 244 void wakeEvents(ProcedureEvent[] events) { 245 schedLock(); 246 try { 247 for (ProcedureEvent event : events) { 248 if (event == null) { 249 continue; 250 } 251 event.wakeInternal(this); 252 } 253 } finally { 254 schedUnlock(); 255 } 256 } 257 258 /** 259 * Wakes up given waiting procedures by pushing them back into scheduler queues. 260 * @return size of given {@code waitQueue}. 261 */ 262 protected int wakeWaitingProcedures(LockAndQueue lockAndQueue) { 263 return lockAndQueue.wakeWaitingProcedures(this); 264 } 265 266 protected void waitProcedure(LockAndQueue lockAndQueue, final Procedure proc) { 267 lockAndQueue.addLast(proc); 268 } 269 270 protected void wakeProcedure(final Procedure procedure) { 271 LOG.trace("Wake {}", procedure); 272 push(procedure, /* addFront= */ true, /* notify= */false); 273 } 274 275 // ========================================================================== 276 // Internal helpers 277 // ========================================================================== 278 protected void schedLock() { 279 schedulerLock.lock(); 280 } 281 282 protected void schedUnlock() { 283 schedulerLock.unlock(); 284 } 285 286 protected void wakePollIfNeeded(final int waitingCount) { 287 if (waitingCount <= 0) { 288 return; 289 } 290 if (waitingCount == 1) { 291 schedWaitCond.signal(); 292 } else { 293 schedWaitCond.signalAll(); 294 } 295 } 296}