001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.procedure2;
020
021import java.util.Iterator;
022import java.util.concurrent.locks.Condition;
023import java.util.concurrent.locks.ReentrantLock;
024import java.util.concurrent.TimeUnit;
025
026import org.apache.yetus.audience.InterfaceAudience;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029
030@InterfaceAudience.Private
031public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
032  private static final Logger LOG = LoggerFactory.getLogger(AbstractProcedureScheduler.class);
033  private final ReentrantLock schedulerLock = new ReentrantLock();
034  private final Condition schedWaitCond = schedulerLock.newCondition();
035  private boolean running = false;
036
037  // TODO: metrics
038  private long pollCalls = 0;
039  private long nullPollCalls = 0;
040
041  @Override
042  public void start() {
043    schedLock();
044    try {
045      running = true;
046    } finally {
047      schedUnlock();
048    }
049  }
050
051  @Override
052  public void stop() {
053    schedLock();
054    try {
055      running = false;
056      schedWaitCond.signalAll();
057    } finally {
058      schedUnlock();
059    }
060  }
061
062  @Override
063  public void signalAll() {
064    schedLock();
065    try {
066      schedWaitCond.signalAll();
067    } finally {
068      schedUnlock();
069    }
070  }
071
072  // ==========================================================================
073  //  Add related
074  // ==========================================================================
075  /**
076   * Add the procedure to the queue.
077   * NOTE: this method is called with the sched lock held.
078   * @param procedure the Procedure to add
079   * @param addFront true if the item should be added to the front of the queue
080   */
081  protected abstract void enqueue(Procedure procedure, boolean addFront);
082
083  @Override
084  public void addFront(final Procedure procedure) {
085    push(procedure, true, true);
086  }
087
088  @Override
089  public void addFront(final Procedure procedure, boolean notify) {
090    push(procedure, true, notify);
091  }
092
093  @Override
094  public void addFront(Iterator<Procedure> procedureIterator) {
095    schedLock();
096    try {
097      int count = 0;
098      while (procedureIterator.hasNext()) {
099        Procedure procedure = procedureIterator.next();
100        if (LOG.isTraceEnabled()) {
101          LOG.trace("Wake " + procedure);
102        }
103        push(procedure, /* addFront= */ true, /* notify= */false);
104        count++;
105      }
106      wakePollIfNeeded(count);
107    } finally {
108      schedUnlock();
109    }
110  }
111
112  @Override
113  public void addBack(final Procedure procedure) {
114    push(procedure, false, true);
115  }
116
117  @Override
118  public void addBack(final Procedure procedure, boolean notify) {
119    push(procedure, false, notify);
120  }
121
122  protected void push(final Procedure procedure, final boolean addFront, final boolean notify) {
123    schedLock();
124    try {
125      enqueue(procedure, addFront);
126      if (notify) {
127        schedWaitCond.signalAll();
128      }
129    } finally {
130      schedUnlock();
131    }
132  }
133
134  // ==========================================================================
135  //  Poll related
136  // ==========================================================================
137  /**
138   * Fetch one Procedure from the queue
139   * NOTE: this method is called with the sched lock held.
140   * @return the Procedure to execute, or null if nothing is available.
141   */
142  protected Procedure dequeue() {
143    return dequeue(false);
144  }
145
146  protected abstract Procedure dequeue(boolean onlyUrgent);
147
148
149  @Override
150  public Procedure poll(boolean onlyUrgent) {
151    return poll(onlyUrgent, -1);
152  }
153
154  @Override
155  public Procedure poll() {
156    return poll(false, -1);
157  }
158
159  @Override
160  public Procedure poll(boolean onlyUrgent, long timeout, TimeUnit unit) {
161    return poll(onlyUrgent, unit.toNanos(timeout));
162  }
163
164  @Override
165  public Procedure poll(long timeout, TimeUnit unit) {
166    return poll(false, unit.toNanos(timeout));
167  }
168
169  public Procedure poll(final long nanos) {
170    return poll(false, nanos);
171  }
172
173  @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
174  public Procedure poll(final boolean onlyUrgent, final long nanos) {
175    schedLock();
176    try {
177      if (!running) {
178        LOG.debug("the scheduler is not running");
179        return null;
180      }
181
182      if (!queueHasRunnables()) {
183        // WA_AWAIT_NOT_IN_LOOP: we are not in a loop because we want the caller
184        // to take decisions after a wake/interruption.
185        if (nanos < 0) {
186          schedWaitCond.await();
187        } else {
188          schedWaitCond.awaitNanos(nanos);
189        }
190        if (!queueHasRunnables()) {
191          nullPollCalls++;
192          return null;
193        }
194      }
195      final Procedure pollResult = dequeue(onlyUrgent);
196
197      pollCalls++;
198      nullPollCalls += (pollResult == null) ? 1 : 0;
199      return pollResult;
200    } catch (InterruptedException e) {
201      Thread.currentThread().interrupt();
202      nullPollCalls++;
203      return null;
204    } finally {
205      schedUnlock();
206    }
207  }
208
209  // ==========================================================================
210  //  Utils
211  // ==========================================================================
212  /**
213   * Returns the number of elements in this queue.
214   * NOTE: this method is called with the sched lock held.
215   * @return the number of elements in this queue.
216   */
217  protected abstract int queueSize();
218
219  /**
220   * Returns true if there are procedures available to process.
221   * NOTE: this method is called with the sched lock held.
222   * @return true if there are procedures available to process, otherwise false.
223   */
224  protected abstract boolean queueHasRunnables();
225
226  @Override
227  public int size() {
228    schedLock();
229    try {
230      return queueSize();
231    } finally {
232      schedUnlock();
233    }
234  }
235
236  @Override
237  public boolean hasRunnables() {
238    schedLock();
239    try {
240      return queueHasRunnables();
241    } finally {
242      schedUnlock();
243    }
244  }
245
246  // ============================================================================
247  //  TODO: Metrics
248  // ============================================================================
249  public long getPollCalls() {
250    return pollCalls;
251  }
252
253  public long getNullPollCalls() {
254    return nullPollCalls;
255  }
256
257  // ==========================================================================
258  //  Procedure Events
259  // ==========================================================================
260
261  /**
262   * Wake up all of the given events.
263   * Note that we first take scheduler lock and then wakeInternal() synchronizes on the event.
264   * Access should remain package-private. Use ProcedureEvent class to wake/suspend events.
265   * @param events the list of events to wake
266   */
267  void wakeEvents(ProcedureEvent[] events) {
268    schedLock();
269    try {
270      for (ProcedureEvent event : events) {
271        if (event == null) {
272          continue;
273        }
274        event.wakeInternal(this);
275      }
276    } finally {
277      schedUnlock();
278    }
279  }
280
281  /**
282   * Wakes up given waiting procedures by pushing them back into scheduler queues.
283   * @return size of given {@code waitQueue}.
284   */
285  protected int wakeWaitingProcedures(LockAndQueue lockAndQueue) {
286    return lockAndQueue.wakeWaitingProcedures(this);
287  }
288
289  protected void waitProcedure(LockAndQueue lockAndQueue, final Procedure proc) {
290    lockAndQueue.addLast(proc);
291  }
292
293  protected void wakeProcedure(final Procedure procedure) {
294    LOG.trace("Wake {}", procedure);
295    push(procedure, /* addFront= */ true, /* notify= */false);
296  }
297
298
299  // ==========================================================================
300  //  Internal helpers
301  // ==========================================================================
302  protected void schedLock() {
303    schedulerLock.lock();
304  }
305
306  protected void schedUnlock() {
307    schedulerLock.unlock();
308  }
309
310  protected void wakePollIfNeeded(final int waitingCount) {
311    if (waitingCount <= 0) {
312      return;
313    }
314    schedWaitCond.signalAll();
315  }
316}