001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import java.util.Iterator; 021import java.util.concurrent.TimeUnit; 022import java.util.concurrent.locks.Condition; 023import java.util.concurrent.locks.ReentrantLock; 024import org.apache.commons.lang3.builder.ToStringBuilder; 025import org.apache.commons.lang3.builder.ToStringStyle; 026import org.apache.yetus.audience.InterfaceAudience; 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029 030@InterfaceAudience.Private 031public abstract class AbstractProcedureScheduler implements ProcedureScheduler { 032 private static final Logger LOG = LoggerFactory.getLogger(AbstractProcedureScheduler.class); 033 private final ReentrantLock schedulerLock = new ReentrantLock(); 034 private final Condition schedWaitCond = schedulerLock.newCondition(); 035 private boolean running = false; 036 037 // TODO: metrics 038 private long pollCalls = 0; 039 private long nullPollCalls = 0; 040 041 @Override 042 public void start() { 043 schedLock(); 044 try { 045 running = true; 046 } finally { 047 schedUnlock(); 048 } 049 } 050 051 @Override 052 public void stop() { 053 schedLock(); 054 try { 055 running = false; 056 schedWaitCond.signalAll(); 057 } finally { 058 schedUnlock(); 059 } 060 } 061 062 @Override 063 public void signalAll() { 064 schedLock(); 065 try { 066 schedWaitCond.signalAll(); 067 } finally { 068 schedUnlock(); 069 } 070 } 071 072 // ========================================================================== 073 // Add related 074 // ========================================================================== 075 /** 076 * Add the procedure to the queue. NOTE: this method is called with the sched lock held. 077 * @param procedure the Procedure to add 078 * @param addFront true if the item should be added to the front of the queue 079 */ 080 protected abstract void enqueue(Procedure procedure, boolean addFront); 081 082 @Override 083 public void addFront(final Procedure procedure) { 084 push(procedure, true, true); 085 } 086 087 @Override 088 public void addFront(final Procedure procedure, boolean notify) { 089 push(procedure, true, notify); 090 } 091 092 @Override 093 public void addFront(Iterator<Procedure> procedureIterator) { 094 schedLock(); 095 try { 096 int count = 0; 097 while (procedureIterator.hasNext()) { 098 Procedure procedure = procedureIterator.next(); 099 if (LOG.isTraceEnabled()) { 100 LOG.trace("Wake " + procedure); 101 } 102 push(procedure, /* addFront= */ true, /* notify= */false); 103 count++; 104 } 105 wakePollIfNeeded(count); 106 } finally { 107 schedUnlock(); 108 } 109 } 110 111 @Override 112 public void addBack(final Procedure procedure) { 113 push(procedure, false, true); 114 } 115 116 @Override 117 public void addBack(final Procedure procedure, boolean notify) { 118 push(procedure, false, notify); 119 } 120 121 protected void push(final Procedure procedure, final boolean addFront, final boolean notify) { 122 schedLock(); 123 try { 124 enqueue(procedure, addFront); 125 if (notify) { 126 schedWaitCond.signal(); 127 } 128 } finally { 129 schedUnlock(); 130 } 131 } 132 133 // ========================================================================== 134 // Poll related 135 // ========================================================================== 136 /** 137 * Fetch one Procedure from the queue NOTE: this method is called with the sched lock held. 138 * @return the Procedure to execute, or null if nothing is available. 139 */ 140 protected abstract Procedure dequeue(); 141 142 @Override 143 public Procedure poll() { 144 return poll(-1); 145 } 146 147 @Override 148 public Procedure poll(long timeout, TimeUnit unit) { 149 return poll(unit.toNanos(timeout)); 150 } 151 152 @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") 153 public Procedure poll(final long nanos) { 154 schedLock(); 155 try { 156 if (!running) { 157 LOG.debug("the scheduler is not running"); 158 return null; 159 } 160 161 if (!queueHasRunnables()) { 162 // WA_AWAIT_NOT_IN_LOOP: we are not in a loop because we want the caller 163 // to take decisions after a wake/interruption. 164 if (nanos < 0) { 165 schedWaitCond.await(); 166 } else { 167 schedWaitCond.awaitNanos(nanos); 168 } 169 if (!queueHasRunnables()) { 170 nullPollCalls++; 171 return null; 172 } 173 } 174 final Procedure pollResult = dequeue(); 175 176 pollCalls++; 177 nullPollCalls += (pollResult == null) ? 1 : 0; 178 return pollResult; 179 } catch (InterruptedException e) { 180 Thread.currentThread().interrupt(); 181 nullPollCalls++; 182 return null; 183 } finally { 184 schedUnlock(); 185 } 186 } 187 188 // ========================================================================== 189 // Utils 190 // ========================================================================== 191 /** 192 * Returns the number of elements in this queue. NOTE: this method is called with the sched lock 193 * held. 194 * @return the number of elements in this queue. 195 */ 196 protected abstract int queueSize(); 197 198 /** 199 * Returns true if there are procedures available to process. NOTE: this method is called with the 200 * sched lock held. 201 * @return true if there are procedures available to process, otherwise false. 202 */ 203 protected abstract boolean queueHasRunnables(); 204 205 @Override 206 public int size() { 207 schedLock(); 208 try { 209 return queueSize(); 210 } finally { 211 schedUnlock(); 212 } 213 } 214 215 @Override 216 public boolean hasRunnables() { 217 schedLock(); 218 try { 219 return queueHasRunnables(); 220 } finally { 221 schedUnlock(); 222 } 223 } 224 225 // ============================================================================ 226 // TODO: Metrics 227 // ============================================================================ 228 public long getPollCalls() { 229 return pollCalls; 230 } 231 232 public long getNullPollCalls() { 233 return nullPollCalls; 234 } 235 236 // ========================================================================== 237 // Procedure Events 238 // ========================================================================== 239 240 /** 241 * Wake up all the given events. Note that we first take scheduler lock and then wakeInternal() 242 * synchronizes on the event. Access should remain package-private. Use ProcedureEvent class to 243 * wake/suspend events. 244 * @param events the list of events to wake 245 */ 246 public void wakeEvents(ProcedureEvent[] events) { 247 schedLock(); 248 try { 249 for (ProcedureEvent event : events) { 250 if (event == null) { 251 continue; 252 } 253 event.wakeInternal(this); 254 } 255 } finally { 256 schedUnlock(); 257 } 258 } 259 260 /** 261 * Wakes up given waiting procedures by pushing them back into scheduler queues. 262 * @return size of given {@code waitQueue}. 263 */ 264 protected int wakeWaitingProcedures(LockAndQueue lockAndQueue) { 265 return lockAndQueue.wakeWaitingProcedures(this); 266 } 267 268 protected void waitProcedure(LockAndQueue lockAndQueue, final Procedure proc) { 269 lockAndQueue.addLast(proc); 270 } 271 272 protected void wakeProcedure(final Procedure procedure) { 273 LOG.trace("Wake {}", procedure); 274 push(procedure, /* addFront= */ true, /* notify= */false); 275 } 276 277 // ========================================================================== 278 // Internal helpers 279 // ========================================================================== 280 protected void schedLock() { 281 schedulerLock.lock(); 282 } 283 284 protected void schedUnlock() { 285 schedulerLock.unlock(); 286 } 287 288 protected void wakePollIfNeeded(final int waitingCount) { 289 if (waitingCount <= 0) { 290 return; 291 } 292 if (waitingCount == 1) { 293 schedWaitCond.signal(); 294 } else { 295 schedWaitCond.signalAll(); 296 } 297 } 298 299 @Override 300 public String toString() { 301 return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("running", running) 302 .build(); 303 } 304}