001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import java.util.Iterator;
021import java.util.concurrent.TimeUnit;
022import java.util.concurrent.locks.Condition;
023import java.util.concurrent.locks.ReentrantLock;
024
025import org.apache.yetus.audience.InterfaceAudience;
026import org.slf4j.Logger;
027import org.slf4j.LoggerFactory;
028
029import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
030
031@InterfaceAudience.Private
032public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
033  private static final Logger LOG = LoggerFactory.getLogger(AbstractProcedureScheduler.class);
034  private final ReentrantLock schedulerLock = new ReentrantLock();
035  private final Condition schedWaitCond = schedulerLock.newCondition();
036  private boolean running = false;
037
038  // TODO: metrics
039  private long pollCalls = 0;
040  private long nullPollCalls = 0;
041
042  @Override
043  public void start() {
044    schedLock();
045    try {
046      running = true;
047    } finally {
048      schedUnlock();
049    }
050  }
051
052  @Override
053  public void stop() {
054    schedLock();
055    try {
056      running = false;
057      schedWaitCond.signalAll();
058    } finally {
059      schedUnlock();
060    }
061  }
062
063  @Override
064  public void signalAll() {
065    schedLock();
066    try {
067      schedWaitCond.signalAll();
068    } finally {
069      schedUnlock();
070    }
071  }
072
073  // ==========================================================================
074  //  Add related
075  // ==========================================================================
076  /**
077   * Add the procedure to the queue.
078   * NOTE: this method is called with the sched lock held.
079   * @param procedure the Procedure to add
080   * @param addFront true if the item should be added to the front of the queue
081   */
082  protected abstract void enqueue(Procedure procedure, boolean addFront);
083
084  @Override
085  public void addFront(final Procedure procedure) {
086    push(procedure, true, true);
087  }
088
089  @Override
090  public void addFront(final Procedure procedure, boolean notify) {
091    push(procedure, true, notify);
092  }
093
094  @Override
095  public void addFront(Iterator<Procedure> procedureIterator) {
096    schedLock();
097    try {
098      int count = 0;
099      while (procedureIterator.hasNext()) {
100        Procedure procedure = procedureIterator.next();
101        if (LOG.isTraceEnabled()) {
102          LOG.trace("Wake " + procedure);
103        }
104        push(procedure, /* addFront= */ true, /* notify= */false);
105        count++;
106      }
107      wakePollIfNeeded(count);
108    } finally {
109      schedUnlock();
110    }
111  }
112
113  @Override
114  public void addBack(final Procedure procedure) {
115    push(procedure, false, true);
116  }
117
118  @Override
119  public void addBack(final Procedure procedure, boolean notify) {
120    push(procedure, false, notify);
121  }
122
123  protected void push(final Procedure procedure, final boolean addFront, final boolean notify) {
124    schedLock();
125    try {
126      enqueue(procedure, addFront);
127      if (notify) {
128        schedWaitCond.signal();
129      }
130    } finally {
131      schedUnlock();
132    }
133  }
134
135  // ==========================================================================
136  //  Poll related
137  // ==========================================================================
138  /**
139   * Fetch one Procedure from the queue
140   * NOTE: this method is called with the sched lock held.
141   * @return the Procedure to execute, or null if nothing is available.
142   */
143  protected abstract Procedure dequeue();
144
145  @Override
146  public Procedure poll() {
147    return poll(-1);
148  }
149
150  @Override
151  public Procedure poll(long timeout, TimeUnit unit) {
152    return poll(unit.toNanos(timeout));
153  }
154
155  @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
156  public Procedure poll(final long nanos) {
157    schedLock();
158    try {
159      if (!running) {
160        LOG.debug("the scheduler is not running");
161        return null;
162      }
163
164      if (!queueHasRunnables()) {
165        // WA_AWAIT_NOT_IN_LOOP: we are not in a loop because we want the caller
166        // to take decisions after a wake/interruption.
167        if (nanos < 0) {
168          schedWaitCond.await();
169        } else {
170          schedWaitCond.awaitNanos(nanos);
171        }
172        if (!queueHasRunnables()) {
173          nullPollCalls++;
174          return null;
175        }
176      }
177      final Procedure pollResult = dequeue();
178
179      pollCalls++;
180      nullPollCalls += (pollResult == null) ? 1 : 0;
181      return pollResult;
182    } catch (InterruptedException e) {
183      Thread.currentThread().interrupt();
184      nullPollCalls++;
185      return null;
186    } finally {
187      schedUnlock();
188    }
189  }
190
191  // ==========================================================================
192  //  Utils
193  // ==========================================================================
194  /**
195   * Returns the number of elements in this queue.
196   * NOTE: this method is called with the sched lock held.
197   * @return the number of elements in this queue.
198   */
199  protected abstract int queueSize();
200
201  /**
202   * Returns true if there are procedures available to process.
203   * NOTE: this method is called with the sched lock held.
204   * @return true if there are procedures available to process, otherwise false.
205   */
206  protected abstract boolean queueHasRunnables();
207
208  @Override
209  public int size() {
210    schedLock();
211    try {
212      return queueSize();
213    } finally {
214      schedUnlock();
215    }
216  }
217
218  @Override
219  public boolean hasRunnables() {
220    schedLock();
221    try {
222      return queueHasRunnables();
223    } finally {
224      schedUnlock();
225    }
226  }
227
228  // ============================================================================
229  //  TODO: Metrics
230  // ============================================================================
231  public long getPollCalls() {
232    return pollCalls;
233  }
234
235  public long getNullPollCalls() {
236    return nullPollCalls;
237  }
238
239  // ==========================================================================
240  //  Procedure Events
241  // ==========================================================================
242
243  /**
244   * Wake up all of the given events.
245   * Note that we first take scheduler lock and then wakeInternal() synchronizes on the event.
246   * Access should remain package-private. Use ProcedureEvent class to wake/suspend events.
247   * @param events the list of events to wake
248   */
249  @VisibleForTesting
250  public void wakeEvents(ProcedureEvent[] events) {
251    schedLock();
252    try {
253      for (ProcedureEvent event : events) {
254        if (event == null) {
255          continue;
256        }
257        event.wakeInternal(this);
258      }
259    } finally {
260      schedUnlock();
261    }
262  }
263
264  /**
265   * Wakes up given waiting procedures by pushing them back into scheduler queues.
266   * @return size of given {@code waitQueue}.
267   */
268  protected int wakeWaitingProcedures(LockAndQueue lockAndQueue) {
269    return lockAndQueue.wakeWaitingProcedures(this);
270  }
271
272  protected void waitProcedure(LockAndQueue lockAndQueue, final Procedure proc) {
273    lockAndQueue.addLast(proc);
274  }
275
276  protected void wakeProcedure(final Procedure procedure) {
277    LOG.trace("Wake {}", procedure);
278    push(procedure, /* addFront= */ true, /* notify= */false);
279  }
280
281  // ==========================================================================
282  //  Internal helpers
283  // ==========================================================================
284  protected void schedLock() {
285    schedulerLock.lock();
286  }
287
288  protected void schedUnlock() {
289    schedulerLock.unlock();
290  }
291
292  protected void wakePollIfNeeded(final int waitingCount) {
293    if (waitingCount <= 0) {
294      return;
295    }
296    if (waitingCount == 1) {
297      schedWaitCond.signal();
298    } else {
299      schedWaitCond.signalAll();
300    }
301  }
302}