001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.HashMap;
023import java.util.List;
024import java.util.concurrent.Callable;
025import java.util.concurrent.CountDownLatch;
026import java.util.concurrent.TimeUnit;
027import org.apache.hadoop.hbase.errorhandling.ForeignException;
028import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
029import org.apache.hadoop.hbase.errorhandling.ForeignExceptionListener;
030import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
031import org.apache.hadoop.hbase.errorhandling.TimeoutExceptionInjector;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
037
038/**
039 * A globally-barriered distributed procedure. This class encapsulates state and methods for
040 * tracking and managing a distributed procedure, as well as aborting if any member encounters a
041 * problem or if a cancellation is requested.
042 * <p>
043 * All procedures first attempt to reach a barrier point with the {@link #sendGlobalBarrierStart()}
044 * method. The procedure contacts all members and waits for all subprocedures to execute
045 * {@link Subprocedure#acquireBarrier} to acquire its local piece of the global barrier and then
046 * send acquisition info back to the coordinator. If all acquisitions at subprocedures succeed, the
047 * coordinator then will call {@link #sendGlobalBarrierReached()}. This notifies members to execute
048 * the {@link Subprocedure#insideBarrier()} method. The procedure is blocked until all
049 * {@link Subprocedure#insideBarrier} executions complete at the members. When
050 * {@link Subprocedure#insideBarrier} completes at each member, the member sends notification to the
051 * coordinator. Once all members complete, the coordinator calls
052 * {@link #sendGlobalBarrierComplete()}.
053 * <p>
054 * If errors are encountered remotely, they are forwarded to the coordinator, and
055 * {@link Subprocedure#cleanup(Exception)} is called.
056 * <p>
057 * Each Procedure and each Subprocedure enforces a time limit on the execution time. If the time
058 * limit expires before the procedure completes the {@link TimeoutExceptionInjector} will trigger an
059 * {@link ForeignException} to abort the procedure. This is particularly useful for situations when
060 * running a distributed {@link Subprocedure} so participants can avoid blocking for extreme amounts
061 * of time if one of the participants fails or takes a really long time (e.g. GC pause).
062 * <p>
063 * Users should generally not directly create or subclass instances of this. They are created for
064 * them implicitly via
065 * {@link ProcedureCoordinator#startProcedure(ForeignExceptionDispatcher, String, byte[], List)}}
066 */
067@InterfaceAudience.Private
068public class Procedure implements Callable<Void>, ForeignExceptionListener {
069  private static final Logger LOG = LoggerFactory.getLogger(Procedure.class);
070
071  //
072  // Arguments and naming
073  //
074
075  // Name of the procedure
076  final private String procName;
077  // Arguments for this procedure execution
078  final private byte[] args;
079
080  //
081  // Execution State
082  //
083  /** latch for waiting until all members have acquire in barrier state */
084  final CountDownLatch acquiredBarrierLatch;
085  /** latch for waiting until all members have executed and released their in barrier state */
086  final CountDownLatch releasedBarrierLatch;
087  /** latch for waiting until a procedure has completed */
088  final CountDownLatch completedLatch;
089  /** monitor to check for errors */
090  private final ForeignExceptionDispatcher monitor;
091
092  //
093  // Execution Timeout Handling.
094  //
095
096  /** frequency to check for errors (ms) */
097  protected final long wakeFrequency;
098  protected final TimeoutExceptionInjector timeoutInjector;
099
100  //
101  // Members' and Coordinator's state
102  //
103
104  /** lock to prevent nodes from acquiring and then releasing before we can track them */
105  private final Object joinBarrierLock = new Object();
106  private final List<String> acquiringMembers;
107  private final List<String> inBarrierMembers;
108  private final HashMap<String, byte[]> dataFromFinishedMembers;
109  private ProcedureCoordinator coord;
110
111  /**
112   * Creates a procedure. (FOR TESTING) {@link Procedure} state to be run by a
113   * {@link ProcedureCoordinator}.
114   * @param coord           coordinator to call back to for general errors (e.g.
115   *                        {@link ProcedureCoordinator#rpcConnectionFailure(String, IOException)}).
116   * @param monitor         error monitor to check for external errors
117   * @param wakeFreq        frequency to check for errors while waiting
118   * @param timeout         amount of time to allow the procedure to run before cancelling
119   * @param procName        name of the procedure instance
120   * @param args            argument data associated with the procedure instance
121   * @param expectedMembers names of the expected members
122   */
123  public Procedure(ProcedureCoordinator coord, ForeignExceptionDispatcher monitor, long wakeFreq,
124    long timeout, String procName, byte[] args, List<String> expectedMembers) {
125    this.coord = coord;
126    this.acquiringMembers = new ArrayList<>(expectedMembers);
127    this.inBarrierMembers = new ArrayList<>(acquiringMembers.size());
128    this.dataFromFinishedMembers = new HashMap<>();
129    this.procName = procName;
130    this.args = args;
131    this.monitor = monitor;
132    this.wakeFrequency = wakeFreq;
133
134    int count = expectedMembers.size();
135    this.acquiredBarrierLatch = new CountDownLatch(count);
136    this.releasedBarrierLatch = new CountDownLatch(count);
137    this.completedLatch = new CountDownLatch(1);
138    this.timeoutInjector = new TimeoutExceptionInjector(monitor, timeout);
139  }
140
141  /**
142   * Create a procedure. Users should generally not directly create instances of this. They are
143   * created them implicitly via
144   * {@link ProcedureCoordinator#createProcedure(ForeignExceptionDispatcher, String, byte[], List)}}
145   * @param coord           coordinator to call back to for general errors (e.g.
146   *                        {@link ProcedureCoordinator#rpcConnectionFailure(String, IOException)}).
147   * @param wakeFreq        frequency to check for errors while waiting
148   * @param timeout         amount of time to allow the procedure to run before cancelling
149   * @param procName        name of the procedure instance
150   * @param args            argument data associated with the procedure instance
151   * @param expectedMembers names of the expected members
152   */
153  public Procedure(ProcedureCoordinator coord, long wakeFreq, long timeout, String procName,
154    byte[] args, List<String> expectedMembers) {
155    this(coord, new ForeignExceptionDispatcher(), wakeFreq, timeout, procName, args,
156      expectedMembers);
157  }
158
159  public String getName() {
160    return procName;
161  }
162
163  /**
164   * Returns String of the procedure members both trying to enter the barrier and already in barrier
165   */
166  public String getStatus() {
167    String waiting, done;
168    synchronized (joinBarrierLock) {
169      waiting = acquiringMembers.toString();
170      done = inBarrierMembers.toString();
171    }
172    return "Procedure " + procName + " { waiting=" + waiting + " done=" + done + " }";
173  }
174
175  /**
176   * Get the ForeignExceptionDispatcher
177   * @return the Procedure's monitor.
178   */
179  public ForeignExceptionDispatcher getErrorMonitor() {
180    return monitor;
181  }
182
183  /**
184   * This call is the main execution thread of the barriered procedure. It sends messages and
185   * essentially blocks until all procedure members acquire or later complete but periodically
186   * checks for foreign exceptions.
187   */
188  @Override
189  @SuppressWarnings("finally")
190  final public Void call() {
191    LOG.info("Starting procedure '" + procName + "'");
192    // start the timer
193    timeoutInjector.start();
194
195    // run the procedure
196    try {
197      // start by checking for error first
198      monitor.rethrowException();
199      LOG.debug("Procedure '" + procName + "' starting 'acquire'");
200      sendGlobalBarrierStart();
201
202      // wait for all the members to report acquisition
203      LOG.debug("Waiting for all members to 'acquire'");
204      waitForLatch(acquiredBarrierLatch, monitor, wakeFrequency, "acquired");
205      monitor.rethrowException();
206
207      LOG.debug("Procedure '" + procName + "' starting 'in-barrier' execution.");
208      sendGlobalBarrierReached();
209
210      // wait for all members to report barrier release
211      LOG.debug("Waiting for all members to 'release'");
212      waitForLatch(releasedBarrierLatch, monitor, wakeFrequency, "released");
213
214      // make sure we didn't get an error during in barrier execution and release
215      monitor.rethrowException();
216      LOG.info("Procedure '" + procName + "' execution completed");
217    } catch (Exception e) {
218      if (e instanceof InterruptedException) {
219        Thread.currentThread().interrupt();
220      }
221      String msg = "Procedure '" + procName + "' execution failed!";
222      LOG.error(msg, e);
223      receive(new ForeignException(getName(), e));
224    } finally {
225      LOG.debug("Running finish phase.");
226      sendGlobalBarrierComplete();
227      completedLatch.countDown();
228
229      // tell the timer we are done, if we get here successfully
230      timeoutInjector.complete();
231      return null;
232    }
233  }
234
235  /**
236   * Sends a message to Members to create a new {@link Subprocedure} for this Procedure and execute
237   * the {@link Subprocedure#acquireBarrier} step.
238   */
239  public void sendGlobalBarrierStart() throws ForeignException {
240    // start the procedure
241    LOG.debug("Starting procedure '" + procName + "', kicking off acquire phase on members.");
242    try {
243      // send procedure barrier start to specified list of members. cloning the list to avoid
244      // concurrent modification from the controller setting the prepared nodes
245      coord.getRpcs().sendGlobalBarrierAcquire(this, args,
246        Lists.newArrayList(this.acquiringMembers));
247    } catch (IOException e) {
248      coord.rpcConnectionFailure("Can't reach controller.", e);
249    } catch (IllegalArgumentException e) {
250      throw new ForeignException(getName(), e);
251    }
252  }
253
254  /**
255   * Sends a message to all members that the global barrier condition has been satisfied. This
256   * should only be executed after all members have completed its
257   * {@link Subprocedure#acquireBarrier()} call successfully. This triggers the member
258   * {@link Subprocedure#insideBarrier} method.
259   */
260  public void sendGlobalBarrierReached() throws ForeignException {
261    try {
262      // trigger to have member run {@link Subprocedure#insideBarrier}
263      coord.getRpcs().sendGlobalBarrierReached(this, Lists.newArrayList(inBarrierMembers));
264    } catch (IOException e) {
265      coord.rpcConnectionFailure("Can't reach controller.", e);
266    }
267  }
268
269  /**
270   * Sends a message to members that all {@link Subprocedure#insideBarrier} calls have completed.
271   * After this executes, the coordinator can assume that any state resources about this barrier
272   * procedure state has been released.
273   */
274  public void sendGlobalBarrierComplete() {
275    LOG.debug("Finished coordinator procedure - removing self from list of running procedures");
276    try {
277      coord.getRpcs().resetMembers(this);
278    } catch (IOException e) {
279      coord.rpcConnectionFailure("Failed to reset procedure:" + procName, e);
280    }
281  }
282
283  //
284  // Call backs from other external processes.
285  //
286
287  /**
288   * Call back triggered by an individual member upon successful local barrier acquisition
289   */
290  public void barrierAcquiredByMember(String member) {
291    LOG.debug("member: '" + member + "' joining acquired barrier for procedure '" + procName
292      + "' on coordinator");
293    if (this.acquiringMembers.contains(member)) {
294      synchronized (joinBarrierLock) {
295        if (this.acquiringMembers.remove(member)) {
296          this.inBarrierMembers.add(member);
297          acquiredBarrierLatch.countDown();
298        }
299      }
300      LOG.debug(
301        "Waiting on: " + acquiredBarrierLatch + " remaining members to acquire global barrier");
302    } else {
303      LOG.warn("Member " + member + " joined barrier, but we weren't waiting on it to join."
304        + " Continuing on.");
305    }
306  }
307
308  /**
309   * Call back triggered by a individual member upon successful local in-barrier execution and
310   * release
311   */
312  public void barrierReleasedByMember(String member, byte[] dataFromMember) {
313    boolean removed = false;
314    synchronized (joinBarrierLock) {
315      removed = this.inBarrierMembers.remove(member);
316      if (removed) {
317        releasedBarrierLatch.countDown();
318      }
319    }
320    if (removed) {
321      LOG.debug("Member: '" + member + "' released barrier for procedure'" + procName
322        + "', counting down latch.  Waiting for " + releasedBarrierLatch.getCount() + " more");
323    } else {
324      LOG.warn("Member: '" + member + "' released barrier for procedure'" + procName
325        + "', but we weren't waiting on it to release!");
326    }
327    dataFromFinishedMembers.put(member, dataFromMember);
328  }
329
330  /**
331   * Waits until the entire procedure has globally completed, or has been aborted. If an exception
332   * is thrown the procedure may or not have run cleanup to trigger the completion latch yet.
333   */
334  public void waitForCompleted() throws ForeignException, InterruptedException {
335    waitForLatch(completedLatch, monitor, wakeFrequency, procName + " completed");
336  }
337
338  /**
339   * Waits until the entire procedure has globally completed, or has been aborted. If an exception
340   * is thrown the procedure may or not have run cleanup to trigger the completion latch yet.
341   * @return data returned from procedure members upon successfully completing subprocedure.
342   */
343  public HashMap<String, byte[]> waitForCompletedWithRet()
344    throws ForeignException, InterruptedException {
345    waitForCompleted();
346    return dataFromFinishedMembers;
347  }
348
349  /**
350   * Check if the entire procedure has globally completed, or has been aborted.
351   */
352  public boolean isCompleted() throws ForeignException {
353    // Rethrow exception if any
354    monitor.rethrowException();
355    return (completedLatch.getCount() == 0);
356  }
357
358  /**
359   * A callback that handles incoming ForeignExceptions.
360   */
361  @Override
362  public void receive(ForeignException e) {
363    monitor.receive(e);
364  }
365
366  /**
367   * Wait for latch to count to zero, ignoring any spurious wake-ups, but waking periodically to
368   * check for errors
369   * @param latch            latch to wait on
370   * @param monitor          monitor to check for errors while waiting
371   * @param wakeFrequency    frequency to wake up and check for errors (in
372   *                         {@link TimeUnit#MILLISECONDS})
373   * @param latchDescription description of the latch, for logging
374   * @throws ForeignException     type of error the monitor can throw, if the task fails
375   * @throws InterruptedException if we are interrupted while waiting on latch
376   */
377  public static void waitForLatch(CountDownLatch latch, ForeignExceptionSnare monitor,
378    long wakeFrequency, String latchDescription) throws ForeignException, InterruptedException {
379    boolean released = false;
380    while (!released) {
381      if (monitor != null) {
382        monitor.rethrowException();
383      }
384      /*
385       * ForeignExceptionDispatcher.LOG.debug("Waiting for '" + latchDescription +
386       * "' latch. (sleep:" + wakeFrequency + " ms)");
387       */
388      released = latch.await(wakeFrequency, TimeUnit.MILLISECONDS);
389    }
390    // check error again in case an error raised during last wait
391    if (monitor != null) {
392      monitor.rethrowException();
393    }
394  }
395}