001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import java.util.Iterator;
021import java.util.concurrent.TimeUnit;
022import java.util.concurrent.locks.Condition;
023import java.util.concurrent.locks.ReentrantLock;
024import org.apache.commons.lang3.builder.ToStringBuilder;
025import org.apache.commons.lang3.builder.ToStringStyle;
026import org.apache.yetus.audience.InterfaceAudience;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029
030@InterfaceAudience.Private
031public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
032  private static final Logger LOG = LoggerFactory.getLogger(AbstractProcedureScheduler.class);
033  private final ReentrantLock schedulerLock = new ReentrantLock();
034  private final Condition schedWaitCond = schedulerLock.newCondition();
035  private boolean running = false;
036
037  // TODO: metrics
038  private long pollCalls = 0;
039  private long nullPollCalls = 0;
040
041  @Override
042  public void start() {
043    schedLock();
044    try {
045      running = true;
046    } finally {
047      schedUnlock();
048    }
049  }
050
051  @Override
052  public void stop() {
053    schedLock();
054    try {
055      running = false;
056      schedWaitCond.signalAll();
057    } finally {
058      schedUnlock();
059    }
060  }
061
062  @Override
063  public void signalAll() {
064    schedLock();
065    try {
066      schedWaitCond.signalAll();
067    } finally {
068      schedUnlock();
069    }
070  }
071
072  // ==========================================================================
073  // Add related
074  // ==========================================================================
075  /**
076   * Add the procedure to the queue. NOTE: this method is called with the sched lock held.
077   * @param procedure the Procedure to add
078   * @param addFront  true if the item should be added to the front of the queue
079   */
080  protected abstract void enqueue(Procedure procedure, boolean addFront);
081
082  @Override
083  public void addFront(final Procedure procedure) {
084    push(procedure, true, true);
085  }
086
087  @Override
088  public void addFront(final Procedure procedure, boolean notify) {
089    push(procedure, true, notify);
090  }
091
092  @Override
093  public void addFront(Iterator<Procedure> procedureIterator) {
094    schedLock();
095    try {
096      int count = 0;
097      while (procedureIterator.hasNext()) {
098        Procedure procedure = procedureIterator.next();
099        if (LOG.isTraceEnabled()) {
100          LOG.trace("Wake " + procedure);
101        }
102        push(procedure, /* addFront= */ true, /* notify= */false);
103        count++;
104      }
105      wakePollIfNeeded(count);
106    } finally {
107      schedUnlock();
108    }
109  }
110
111  @Override
112  public void addBack(final Procedure procedure) {
113    push(procedure, false, true);
114  }
115
116  @Override
117  public void addBack(final Procedure procedure, boolean notify) {
118    push(procedure, false, notify);
119  }
120
121  protected void push(final Procedure procedure, final boolean addFront, final boolean notify) {
122    schedLock();
123    try {
124      enqueue(procedure, addFront);
125      if (notify) {
126        schedWaitCond.signal();
127      }
128    } finally {
129      schedUnlock();
130    }
131  }
132
133  // ==========================================================================
134  // Poll related
135  // ==========================================================================
136  /**
137   * Fetch one Procedure from the queue NOTE: this method is called with the sched lock held.
138   * @return the Procedure to execute, or null if nothing is available.
139   */
140  protected abstract Procedure dequeue();
141
142  @Override
143  public Procedure poll() {
144    return poll(-1);
145  }
146
147  @Override
148  public Procedure poll(long timeout, TimeUnit unit) {
149    return poll(unit.toNanos(timeout));
150  }
151
152  @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
153  public Procedure poll(final long nanos) {
154    schedLock();
155    try {
156      if (!running) {
157        LOG.debug("the scheduler is not running");
158        return null;
159      }
160
161      if (!queueHasRunnables()) {
162        // WA_AWAIT_NOT_IN_LOOP: we are not in a loop because we want the caller
163        // to take decisions after a wake/interruption.
164        if (nanos < 0) {
165          schedWaitCond.await();
166        } else {
167          schedWaitCond.awaitNanos(nanos);
168        }
169        if (!queueHasRunnables()) {
170          nullPollCalls++;
171          return null;
172        }
173      }
174      final Procedure pollResult = dequeue();
175
176      pollCalls++;
177      nullPollCalls += (pollResult == null) ? 1 : 0;
178      return pollResult;
179    } catch (InterruptedException e) {
180      Thread.currentThread().interrupt();
181      nullPollCalls++;
182      return null;
183    } finally {
184      schedUnlock();
185    }
186  }
187
188  // ==========================================================================
189  // Utils
190  // ==========================================================================
191  /**
192   * Returns the number of elements in this queue. NOTE: this method is called with the sched lock
193   * held.
194   * @return the number of elements in this queue.
195   */
196  protected abstract int queueSize();
197
198  /**
199   * Returns true if there are procedures available to process. NOTE: this method is called with the
200   * sched lock held.
201   * @return true if there are procedures available to process, otherwise false.
202   */
203  protected abstract boolean queueHasRunnables();
204
205  @Override
206  public int size() {
207    schedLock();
208    try {
209      return queueSize();
210    } finally {
211      schedUnlock();
212    }
213  }
214
215  @Override
216  public boolean hasRunnables() {
217    schedLock();
218    try {
219      return queueHasRunnables();
220    } finally {
221      schedUnlock();
222    }
223  }
224
225  // ============================================================================
226  // TODO: Metrics
227  // ============================================================================
228  public long getPollCalls() {
229    return pollCalls;
230  }
231
232  public long getNullPollCalls() {
233    return nullPollCalls;
234  }
235
236  // ==========================================================================
237  // Procedure Events
238  // ==========================================================================
239
240  /**
241   * Wake up all the given events. Note that we first take scheduler lock and then wakeInternal()
242   * synchronizes on the event. Access should remain package-private. Use ProcedureEvent class to
243   * wake/suspend events.
244   * @param events the list of events to wake
245   */
246  public void wakeEvents(ProcedureEvent[] events) {
247    schedLock();
248    try {
249      for (ProcedureEvent event : events) {
250        if (event == null) {
251          continue;
252        }
253        event.wakeInternal(this);
254      }
255    } finally {
256      schedUnlock();
257    }
258  }
259
260  /**
261   * Wakes up given waiting procedures by pushing them back into scheduler queues.
262   * @return size of given {@code waitQueue}.
263   */
264  protected int wakeWaitingProcedures(LockAndQueue lockAndQueue) {
265    return lockAndQueue.wakeWaitingProcedures(this);
266  }
267
268  protected void waitProcedure(LockAndQueue lockAndQueue, final Procedure proc) {
269    lockAndQueue.addLast(proc);
270  }
271
272  protected void wakeProcedure(final Procedure procedure) {
273    LOG.trace("Wake {}", procedure);
274    push(procedure, /* addFront= */ true, /* notify= */false);
275  }
276
277  // ==========================================================================
278  // Internal helpers
279  // ==========================================================================
280  protected void schedLock() {
281    schedulerLock.lock();
282  }
283
284  protected void schedUnlock() {
285    schedulerLock.unlock();
286  }
287
288  protected void wakePollIfNeeded(final int waitingCount) {
289    if (waitingCount <= 0) {
290      return;
291    }
292    if (waitingCount == 1) {
293      schedWaitCond.signal();
294    } else {
295      schedWaitCond.signalAll();
296    }
297  }
298
299  @Override
300  public String toString() {
301    return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("running", running)
302      .build();
303  }
304}