001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import java.util.Iterator;
021import java.util.concurrent.TimeUnit;
022import java.util.concurrent.locks.Condition;
023import java.util.concurrent.locks.ReentrantLock;
024import org.apache.yetus.audience.InterfaceAudience;
025import org.slf4j.Logger;
026import org.slf4j.LoggerFactory;
027
028@InterfaceAudience.Private
029public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
030  private static final Logger LOG = LoggerFactory.getLogger(AbstractProcedureScheduler.class);
031  private final ReentrantLock schedulerLock = new ReentrantLock();
032  private final Condition schedWaitCond = schedulerLock.newCondition();
033  private boolean running = false;
034
035  // TODO: metrics
036  private long pollCalls = 0;
037  private long nullPollCalls = 0;
038
039  @Override
040  public void start() {
041    schedLock();
042    try {
043      running = true;
044    } finally {
045      schedUnlock();
046    }
047  }
048
049  @Override
050  public void stop() {
051    schedLock();
052    try {
053      running = false;
054      schedWaitCond.signalAll();
055    } finally {
056      schedUnlock();
057    }
058  }
059
060  @Override
061  public void signalAll() {
062    schedLock();
063    try {
064      schedWaitCond.signalAll();
065    } finally {
066      schedUnlock();
067    }
068  }
069
070  // ==========================================================================
071  // Add related
072  // ==========================================================================
073  /**
074   * Add the procedure to the queue. NOTE: this method is called with the sched lock held.
075   * @param procedure the Procedure to add
076   * @param addFront  true if the item should be added to the front of the queue
077   */
078  protected abstract void enqueue(Procedure procedure, boolean addFront);
079
080  @Override
081  public void addFront(final Procedure procedure) {
082    push(procedure, true, true);
083  }
084
085  @Override
086  public void addFront(final Procedure procedure, boolean notify) {
087    push(procedure, true, notify);
088  }
089
090  @Override
091  public void addFront(Iterator<Procedure> procedureIterator) {
092    schedLock();
093    try {
094      int count = 0;
095      while (procedureIterator.hasNext()) {
096        Procedure procedure = procedureIterator.next();
097        if (LOG.isTraceEnabled()) {
098          LOG.trace("Wake " + procedure);
099        }
100        push(procedure, /* addFront= */ true, /* notify= */false);
101        count++;
102      }
103      wakePollIfNeeded(count);
104    } finally {
105      schedUnlock();
106    }
107  }
108
109  @Override
110  public void addBack(final Procedure procedure) {
111    push(procedure, false, true);
112  }
113
114  @Override
115  public void addBack(final Procedure procedure, boolean notify) {
116    push(procedure, false, notify);
117  }
118
119  protected void push(final Procedure procedure, final boolean addFront, final boolean notify) {
120    schedLock();
121    try {
122      enqueue(procedure, addFront);
123      if (notify) {
124        schedWaitCond.signal();
125      }
126    } finally {
127      schedUnlock();
128    }
129  }
130
131  // ==========================================================================
132  // Poll related
133  // ==========================================================================
134  /**
135   * Fetch one Procedure from the queue NOTE: this method is called with the sched lock held.
136   * @return the Procedure to execute, or null if nothing is available.
137   */
138  protected abstract Procedure dequeue();
139
140  @Override
141  public Procedure poll() {
142    return poll(-1);
143  }
144
145  @Override
146  public Procedure poll(long timeout, TimeUnit unit) {
147    return poll(unit.toNanos(timeout));
148  }
149
150  @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
151  public Procedure poll(final long nanos) {
152    schedLock();
153    try {
154      if (!running) {
155        LOG.debug("the scheduler is not running");
156        return null;
157      }
158
159      if (!queueHasRunnables()) {
160        // WA_AWAIT_NOT_IN_LOOP: we are not in a loop because we want the caller
161        // to take decisions after a wake/interruption.
162        if (nanos < 0) {
163          schedWaitCond.await();
164        } else {
165          schedWaitCond.awaitNanos(nanos);
166        }
167        if (!queueHasRunnables()) {
168          nullPollCalls++;
169          return null;
170        }
171      }
172      final Procedure pollResult = dequeue();
173
174      pollCalls++;
175      nullPollCalls += (pollResult == null) ? 1 : 0;
176      return pollResult;
177    } catch (InterruptedException e) {
178      Thread.currentThread().interrupt();
179      nullPollCalls++;
180      return null;
181    } finally {
182      schedUnlock();
183    }
184  }
185
186  // ==========================================================================
187  // Utils
188  // ==========================================================================
189  /**
190   * Returns the number of elements in this queue. NOTE: this method is called with the sched lock
191   * held.
192   * @return the number of elements in this queue.
193   */
194  protected abstract int queueSize();
195
196  /**
197   * Returns true if there are procedures available to process. NOTE: this method is called with the
198   * sched lock held.
199   * @return true if there are procedures available to process, otherwise false.
200   */
201  protected abstract boolean queueHasRunnables();
202
203  @Override
204  public int size() {
205    schedLock();
206    try {
207      return queueSize();
208    } finally {
209      schedUnlock();
210    }
211  }
212
213  @Override
214  public boolean hasRunnables() {
215    schedLock();
216    try {
217      return queueHasRunnables();
218    } finally {
219      schedUnlock();
220    }
221  }
222
223  // ============================================================================
224  // TODO: Metrics
225  // ============================================================================
226  public long getPollCalls() {
227    return pollCalls;
228  }
229
230  public long getNullPollCalls() {
231    return nullPollCalls;
232  }
233
234  // ==========================================================================
235  // Procedure Events
236  // ==========================================================================
237
238  /**
239   * Wake up all of the given events. Note that we first take scheduler lock and then wakeInternal()
240   * synchronizes on the event. Access should remain package-private. Use ProcedureEvent class to
241   * wake/suspend events.
242   * @param events the list of events to wake
243   */
244  void wakeEvents(ProcedureEvent[] events) {
245    schedLock();
246    try {
247      for (ProcedureEvent event : events) {
248        if (event == null) {
249          continue;
250        }
251        event.wakeInternal(this);
252      }
253    } finally {
254      schedUnlock();
255    }
256  }
257
258  /**
259   * Wakes up given waiting procedures by pushing them back into scheduler queues.
260   * @return size of given {@code waitQueue}.
261   */
262  protected int wakeWaitingProcedures(LockAndQueue lockAndQueue) {
263    return lockAndQueue.wakeWaitingProcedures(this);
264  }
265
266  protected void waitProcedure(LockAndQueue lockAndQueue, final Procedure proc) {
267    lockAndQueue.addLast(proc);
268  }
269
270  protected void wakeProcedure(final Procedure procedure) {
271    LOG.trace("Wake {}", procedure);
272    push(procedure, /* addFront= */ true, /* notify= */false);
273  }
274
275  // ==========================================================================
276  // Internal helpers
277  // ==========================================================================
278  protected void schedLock() {
279    schedulerLock.lock();
280  }
281
282  protected void schedUnlock() {
283    schedulerLock.unlock();
284  }
285
286  protected void wakePollIfNeeded(final int waitingCount) {
287    if (waitingCount <= 0) {
288      return;
289    }
290    if (waitingCount == 1) {
291      schedWaitCond.signal();
292    } else {
293      schedWaitCond.signalAll();
294    }
295  }
296}