001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import java.util.Iterator;
021import java.util.concurrent.TimeUnit;
022import java.util.concurrent.locks.Condition;
023import java.util.concurrent.locks.ReentrantLock;
024
025import org.apache.yetus.audience.InterfaceAudience;
026import org.slf4j.Logger;
027import org.slf4j.LoggerFactory;
028
029@InterfaceAudience.Private
030public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
031  private static final Logger LOG = LoggerFactory.getLogger(AbstractProcedureScheduler.class);
032  private final ReentrantLock schedulerLock = new ReentrantLock();
033  private final Condition schedWaitCond = schedulerLock.newCondition();
034  private boolean running = false;
035
036  // TODO: metrics
037  private long pollCalls = 0;
038  private long nullPollCalls = 0;
039
040  @Override
041  public void start() {
042    schedLock();
043    try {
044      running = true;
045    } finally {
046      schedUnlock();
047    }
048  }
049
050  @Override
051  public void stop() {
052    schedLock();
053    try {
054      running = false;
055      schedWaitCond.signalAll();
056    } finally {
057      schedUnlock();
058    }
059  }
060
061  @Override
062  public void signalAll() {
063    schedLock();
064    try {
065      schedWaitCond.signalAll();
066    } finally {
067      schedUnlock();
068    }
069  }
070
071  // ==========================================================================
072  //  Add related
073  // ==========================================================================
074  /**
075   * Add the procedure to the queue.
076   * NOTE: this method is called with the sched lock held.
077   * @param procedure the Procedure to add
078   * @param addFront true if the item should be added to the front of the queue
079   */
080  protected abstract void enqueue(Procedure procedure, boolean addFront);
081
082  @Override
083  public void addFront(final Procedure procedure) {
084    push(procedure, true, true);
085  }
086
087  @Override
088  public void addFront(final Procedure procedure, boolean notify) {
089    push(procedure, true, notify);
090  }
091
092  @Override
093  public void addFront(Iterator<Procedure> procedureIterator) {
094    schedLock();
095    try {
096      int count = 0;
097      while (procedureIterator.hasNext()) {
098        Procedure procedure = procedureIterator.next();
099        if (LOG.isTraceEnabled()) {
100          LOG.trace("Wake " + procedure);
101        }
102        push(procedure, /* addFront= */ true, /* notify= */false);
103        count++;
104      }
105      wakePollIfNeeded(count);
106    } finally {
107      schedUnlock();
108    }
109  }
110
111  @Override
112  public void addBack(final Procedure procedure) {
113    push(procedure, false, true);
114  }
115
116  @Override
117  public void addBack(final Procedure procedure, boolean notify) {
118    push(procedure, false, notify);
119  }
120
121  protected void push(final Procedure procedure, final boolean addFront, final boolean notify) {
122    schedLock();
123    try {
124      enqueue(procedure, addFront);
125      if (notify) {
126        schedWaitCond.signal();
127      }
128    } finally {
129      schedUnlock();
130    }
131  }
132
133  // ==========================================================================
134  //  Poll related
135  // ==========================================================================
136  /**
137   * Fetch one Procedure from the queue
138   * NOTE: this method is called with the sched lock held.
139   * @return the Procedure to execute, or null if nothing is available.
140   */
141  protected abstract Procedure dequeue();
142
143  @Override
144  public Procedure poll() {
145    return poll(-1);
146  }
147
148  @Override
149  public Procedure poll(long timeout, TimeUnit unit) {
150    return poll(unit.toNanos(timeout));
151  }
152
153  @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
154  public Procedure poll(final long nanos) {
155    schedLock();
156    try {
157      if (!running) {
158        LOG.debug("the scheduler is not running");
159        return null;
160      }
161
162      if (!queueHasRunnables()) {
163        // WA_AWAIT_NOT_IN_LOOP: we are not in a loop because we want the caller
164        // to take decisions after a wake/interruption.
165        if (nanos < 0) {
166          schedWaitCond.await();
167        } else {
168          schedWaitCond.awaitNanos(nanos);
169        }
170        if (!queueHasRunnables()) {
171          nullPollCalls++;
172          return null;
173        }
174      }
175      final Procedure pollResult = dequeue();
176
177      pollCalls++;
178      nullPollCalls += (pollResult == null) ? 1 : 0;
179      return pollResult;
180    } catch (InterruptedException e) {
181      Thread.currentThread().interrupt();
182      nullPollCalls++;
183      return null;
184    } finally {
185      schedUnlock();
186    }
187  }
188
189  // ==========================================================================
190  //  Utils
191  // ==========================================================================
192  /**
193   * Returns the number of elements in this queue.
194   * NOTE: this method is called with the sched lock held.
195   * @return the number of elements in this queue.
196   */
197  protected abstract int queueSize();
198
199  /**
200   * Returns true if there are procedures available to process.
201   * NOTE: this method is called with the sched lock held.
202   * @return true if there are procedures available to process, otherwise false.
203   */
204  protected abstract boolean queueHasRunnables();
205
206  @Override
207  public int size() {
208    schedLock();
209    try {
210      return queueSize();
211    } finally {
212      schedUnlock();
213    }
214  }
215
216  @Override
217  public boolean hasRunnables() {
218    schedLock();
219    try {
220      return queueHasRunnables();
221    } finally {
222      schedUnlock();
223    }
224  }
225
226  // ============================================================================
227  //  TODO: Metrics
228  // ============================================================================
229  public long getPollCalls() {
230    return pollCalls;
231  }
232
233  public long getNullPollCalls() {
234    return nullPollCalls;
235  }
236
237  // ==========================================================================
238  //  Procedure Events
239  // ==========================================================================
240
241  /**
242   * Wake up all of the given events.
243   * Note that we first take scheduler lock and then wakeInternal() synchronizes on the event.
244   * Access should remain package-private. Use ProcedureEvent class to wake/suspend events.
245   * @param events the list of events to wake
246   */
247  void wakeEvents(ProcedureEvent[] events) {
248    schedLock();
249    try {
250      for (ProcedureEvent event : events) {
251        if (event == null) {
252          continue;
253        }
254        event.wakeInternal(this);
255      }
256    } finally {
257      schedUnlock();
258    }
259  }
260
261  /**
262   * Wakes up given waiting procedures by pushing them back into scheduler queues.
263   * @return size of given {@code waitQueue}.
264   */
265  protected int wakeWaitingProcedures(LockAndQueue lockAndQueue) {
266    return lockAndQueue.wakeWaitingProcedures(this);
267  }
268
269  protected void waitProcedure(LockAndQueue lockAndQueue, final Procedure proc) {
270    lockAndQueue.addLast(proc);
271  }
272
273  protected void wakeProcedure(final Procedure procedure) {
274    LOG.trace("Wake {}", procedure);
275    push(procedure, /* addFront= */ true, /* notify= */false);
276  }
277
278  // ==========================================================================
279  //  Internal helpers
280  // ==========================================================================
281  protected void schedLock() {
282    schedulerLock.lock();
283  }
284
285  protected void schedUnlock() {
286    schedulerLock.unlock();
287  }
288
289  protected void wakePollIfNeeded(final int waitingCount) {
290    if (waitingCount <= 0) {
291      return;
292    }
293    if (waitingCount == 1) {
294      schedWaitCond.signal();
295    } else {
296      schedWaitCond.signalAll();
297    }
298  }
299}