001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.HashMap;
023import java.util.List;
024import java.util.concurrent.Callable;
025import java.util.concurrent.CountDownLatch;
026import java.util.concurrent.TimeUnit;
027
028import org.apache.yetus.audience.InterfaceAudience;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031import org.apache.hadoop.hbase.errorhandling.ForeignException;
032import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
033import org.apache.hadoop.hbase.errorhandling.ForeignExceptionListener;
034import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
035import org.apache.hadoop.hbase.errorhandling.TimeoutExceptionInjector;
036
037import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
038
039/**
040 * A globally-barriered distributed procedure.  This class encapsulates state and methods for
041 * tracking and managing a distributed procedure, as well as aborting if any member encounters
042 * a problem or if a cancellation is requested.
043 * <p>
044 * All procedures first attempt to reach a barrier point with the {@link #sendGlobalBarrierStart()}
045 * method.  The procedure contacts all members and waits for all subprocedures to execute
046 * {@link Subprocedure#acquireBarrier} to acquire its local piece of the global barrier and then
047 * send acquisition info back to the coordinator.  If all acquisitions at subprocedures succeed,
048 * the coordinator then will call {@link #sendGlobalBarrierReached()}.  This notifies members to
049 * execute the {@link Subprocedure#insideBarrier()} method.  The procedure is blocked until all
050 * {@link Subprocedure#insideBarrier} executions complete at the members.  When
051 * {@link Subprocedure#insideBarrier} completes at each member, the member sends notification to
052 * the coordinator.  Once all members complete, the coordinator calls
053 * {@link #sendGlobalBarrierComplete()}.
054 * <p>
055 * If errors are encountered remotely, they are forwarded to the coordinator, and
056 * {@link Subprocedure#cleanup(Exception)} is called.
057 * <p>
058 * Each Procedure and each Subprocedure enforces a time limit on the execution time. If the time
059 * limit expires before the procedure completes the {@link TimeoutExceptionInjector} will trigger
060 * an {@link ForeignException} to abort the procedure.  This is particularly useful for situations
061 * when running a distributed {@link Subprocedure} so participants can avoid blocking for extreme
062 * amounts of time if one of the participants fails or takes a really long time (e.g. GC pause).
063 * <p>
064 * Users should generally not directly create or subclass instances of this.  They are created
065 * for them implicitly via {@link ProcedureCoordinator#startProcedure(ForeignExceptionDispatcher,
066 * String, byte[], List)}}
067 */
068@InterfaceAudience.Private
069public class Procedure implements Callable<Void>, ForeignExceptionListener {
070  private static final Logger LOG = LoggerFactory.getLogger(Procedure.class);
071
072  //
073  // Arguments and naming
074  //
075
076  // Name of the procedure
077  final private String procName;
078  // Arguments for this procedure execution
079  final private byte[] args;
080
081  //
082  // Execution State
083  //
084  /** latch for waiting until all members have acquire in barrier state */
085  final CountDownLatch acquiredBarrierLatch;
086  /** latch for waiting until all members have executed and released their in barrier state */
087  final CountDownLatch releasedBarrierLatch;
088  /** latch for waiting until a procedure has completed */
089  final CountDownLatch completedLatch;
090  /** monitor to check for errors */
091  private final ForeignExceptionDispatcher monitor;
092
093  //
094  // Execution Timeout Handling.
095  //
096
097  /** frequency to check for errors (ms) */
098  protected final long wakeFrequency;
099  protected final TimeoutExceptionInjector timeoutInjector;
100
101  //
102  // Members' and Coordinator's state
103  //
104
105  /** lock to prevent nodes from acquiring and then releasing before we can track them */
106  private final Object joinBarrierLock = new Object();
107  private final List<String> acquiringMembers;
108  private final List<String> inBarrierMembers;
109  private final HashMap<String, byte[]> dataFromFinishedMembers;
110  private ProcedureCoordinator coord;
111
112  /**
113   * Creates a procedure. (FOR TESTING)
114   *
115   * {@link Procedure} state to be run by a {@link ProcedureCoordinator}.
116   * @param coord coordinator to call back to for general errors (e.g.
117   *          {@link ProcedureCoordinator#rpcConnectionFailure(String, IOException)}).
118   * @param monitor error monitor to check for external errors
119   * @param wakeFreq frequency to check for errors while waiting
120   * @param timeout amount of time to allow the procedure to run before cancelling
121   * @param procName name of the procedure instance
122   * @param args argument data associated with the procedure instance
123   * @param expectedMembers names of the expected members
124   */
125  public Procedure(ProcedureCoordinator coord, ForeignExceptionDispatcher monitor, long wakeFreq,
126      long timeout, String procName, byte[] args, List<String> expectedMembers) {
127    this.coord = coord;
128    this.acquiringMembers = new ArrayList<>(expectedMembers);
129    this.inBarrierMembers = new ArrayList<>(acquiringMembers.size());
130    this.dataFromFinishedMembers = new HashMap<>();
131    this.procName = procName;
132    this.args = args;
133    this.monitor = monitor;
134    this.wakeFrequency = wakeFreq;
135
136    int count = expectedMembers.size();
137    this.acquiredBarrierLatch = new CountDownLatch(count);
138    this.releasedBarrierLatch = new CountDownLatch(count);
139    this.completedLatch = new CountDownLatch(1);
140    this.timeoutInjector = new TimeoutExceptionInjector(monitor, timeout);
141  }
142
143  /**
144   * Create a procedure.
145   *
146   * Users should generally not directly create instances of this.  They are created them
147   * implicitly via {@link ProcedureCoordinator#createProcedure(ForeignExceptionDispatcher,
148   * String, byte[], List)}}
149   *
150   * @param coord coordinator to call back to for general errors (e.g.
151   *          {@link ProcedureCoordinator#rpcConnectionFailure(String, IOException)}).
152   * @param wakeFreq frequency to check for errors while waiting
153   * @param timeout amount of time to allow the procedure to run before cancelling
154   * @param procName name of the procedure instance
155   * @param args argument data associated with the procedure instance
156   * @param expectedMembers names of the expected members
157   */
158  public Procedure(ProcedureCoordinator coord, long wakeFreq, long timeout,
159      String procName, byte[] args, List<String> expectedMembers) {
160    this(coord, new ForeignExceptionDispatcher(), wakeFreq, timeout, procName, args,
161        expectedMembers);
162  }
163
164  public String getName() {
165    return procName;
166  }
167
168  /**
169   * @return String of the procedure members both trying to enter the barrier and already in barrier
170   */
171  public String getStatus() {
172    String waiting, done;
173    synchronized (joinBarrierLock) {
174      waiting = acquiringMembers.toString();
175      done = inBarrierMembers.toString();
176    }
177    return "Procedure " + procName + " { waiting=" + waiting + " done="+ done + " }";
178  }
179
180  /**
181   * Get the ForeignExceptionDispatcher
182   * @return the Procedure's monitor.
183   */
184  public ForeignExceptionDispatcher getErrorMonitor() {
185    return monitor;
186  }
187
188  /**
189   * This call is the main execution thread of the barriered procedure.  It sends messages and
190   * essentially blocks until all procedure members acquire or later complete but periodically
191   * checks for foreign exceptions.
192   */
193  @Override
194  @SuppressWarnings("finally")
195  final public Void call() {
196    LOG.info("Starting procedure '" + procName + "'");
197    // start the timer
198    timeoutInjector.start();
199
200    // run the procedure
201    try {
202      // start by checking for error first
203      monitor.rethrowException();
204      LOG.debug("Procedure '" + procName + "' starting 'acquire'");
205      sendGlobalBarrierStart();
206
207      // wait for all the members to report acquisition
208      LOG.debug("Waiting for all members to 'acquire'");
209      waitForLatch(acquiredBarrierLatch, monitor, wakeFrequency, "acquired");
210      monitor.rethrowException();
211
212      LOG.debug("Procedure '" + procName + "' starting 'in-barrier' execution.");
213      sendGlobalBarrierReached();
214
215      // wait for all members to report barrier release
216      LOG.debug("Waiting for all members to 'release'");
217      waitForLatch(releasedBarrierLatch, monitor, wakeFrequency, "released");
218
219      // make sure we didn't get an error during in barrier execution and release
220      monitor.rethrowException();
221      LOG.info("Procedure '" + procName + "' execution completed");
222    } catch (Exception e) {
223      if (e instanceof InterruptedException) {
224        Thread.currentThread().interrupt();
225      }
226      String msg = "Procedure '" + procName +"' execution failed!";
227      LOG.error(msg, e);
228      receive(new ForeignException(getName(), e));
229    } finally {
230      LOG.debug("Running finish phase.");
231      sendGlobalBarrierComplete();
232      completedLatch.countDown();
233
234      // tell the timer we are done, if we get here successfully
235      timeoutInjector.complete();
236      return null;
237    }
238  }
239
240  /**
241   * Sends a message to Members to create a new {@link Subprocedure} for this Procedure and execute
242   * the {@link Subprocedure#acquireBarrier} step.
243   * @throws ForeignException
244   */
245  public void sendGlobalBarrierStart() throws ForeignException {
246    // start the procedure
247    LOG.debug("Starting procedure '" + procName + "', kicking off acquire phase on members.");
248    try {
249      // send procedure barrier start to specified list of members. cloning the list to avoid
250      // concurrent modification from the controller setting the prepared nodes
251      coord.getRpcs().sendGlobalBarrierAcquire(this, args, Lists.newArrayList(this.acquiringMembers));
252    } catch (IOException e) {
253      coord.rpcConnectionFailure("Can't reach controller.", e);
254    } catch (IllegalArgumentException e) {
255      throw new ForeignException(getName(), e);
256    }
257  }
258
259  /**
260   * Sends a message to all members that the global barrier condition has been satisfied.  This
261   * should only be executed after all members have completed its
262   * {@link Subprocedure#acquireBarrier()} call successfully.  This triggers the member
263   * {@link Subprocedure#insideBarrier} method.
264   * @throws ForeignException
265   */
266  public void sendGlobalBarrierReached() throws ForeignException {
267    try {
268      // trigger to have member run {@link Subprocedure#insideBarrier}
269      coord.getRpcs().sendGlobalBarrierReached(this, Lists.newArrayList(inBarrierMembers));
270    } catch (IOException e) {
271      coord.rpcConnectionFailure("Can't reach controller.", e);
272    }
273  }
274
275  /**
276   * Sends a message to members that all {@link Subprocedure#insideBarrier} calls have completed.
277   * After this executes, the coordinator can assume that any state resources about this barrier
278   * procedure state has been released.
279   */
280  public void sendGlobalBarrierComplete() {
281    LOG.debug("Finished coordinator procedure - removing self from list of running procedures");
282    try {
283      coord.getRpcs().resetMembers(this);
284    } catch (IOException e) {
285      coord.rpcConnectionFailure("Failed to reset procedure:" + procName, e);
286    }
287  }
288
289  //
290  // Call backs from other external processes.
291  //
292
293  /**
294   * Call back triggered by an individual member upon successful local barrier acquisition
295   * @param member
296   */
297  public void barrierAcquiredByMember(String member) {
298    LOG.debug("member: '" + member + "' joining acquired barrier for procedure '" + procName
299        + "' on coordinator");
300    if (this.acquiringMembers.contains(member)) {
301      synchronized (joinBarrierLock) {
302        if (this.acquiringMembers.remove(member)) {
303          this.inBarrierMembers.add(member);
304          acquiredBarrierLatch.countDown();
305        }
306      }
307      LOG.debug("Waiting on: " + acquiredBarrierLatch + " remaining members to acquire global barrier");
308    } else {
309      LOG.warn("Member " + member + " joined barrier, but we weren't waiting on it to join." +
310          " Continuing on.");
311    }
312  }
313
314  /**
315   * Call back triggered by a individual member upon successful local in-barrier execution and
316   * release
317   * @param member
318   * @param dataFromMember
319   */
320  public void barrierReleasedByMember(String member, byte[] dataFromMember) {
321    boolean removed = false;
322    synchronized (joinBarrierLock) {
323      removed = this.inBarrierMembers.remove(member);
324      if (removed) {
325        releasedBarrierLatch.countDown();
326      }
327    }
328    if (removed) {
329      LOG.debug("Member: '" + member + "' released barrier for procedure'" + procName
330          + "', counting down latch.  Waiting for " + releasedBarrierLatch.getCount()
331          + " more");
332    } else {
333      LOG.warn("Member: '" + member + "' released barrier for procedure'" + procName
334          + "', but we weren't waiting on it to release!");
335    }
336    dataFromFinishedMembers.put(member, dataFromMember);
337  }
338
339  /**
340   * Waits until the entire procedure has globally completed, or has been aborted.  If an
341   * exception is thrown the procedure may or not have run cleanup to trigger the completion latch
342   * yet.
343   * @throws ForeignException
344   * @throws InterruptedException
345   */
346  public void waitForCompleted() throws ForeignException, InterruptedException {
347    waitForLatch(completedLatch, monitor, wakeFrequency, procName + " completed");
348  }
349
350  /**
351   * Waits until the entire procedure has globally completed, or has been aborted.  If an
352   * exception is thrown the procedure may or not have run cleanup to trigger the completion latch
353   * yet.
354   * @return data returned from procedure members upon successfully completing subprocedure.
355   * @throws ForeignException
356   * @throws InterruptedException
357   */
358  public HashMap<String, byte[]> waitForCompletedWithRet() throws ForeignException, InterruptedException {
359    waitForCompleted();
360    return dataFromFinishedMembers;
361  }
362
363  /**
364   * Check if the entire procedure has globally completed, or has been aborted.
365   * @throws ForeignException
366   */
367  public boolean isCompleted() throws ForeignException {
368    // Rethrow exception if any
369    monitor.rethrowException();
370    return (completedLatch.getCount() == 0);
371  }
372
373  /**
374   * A callback that handles incoming ForeignExceptions.
375   */
376  @Override
377  public void receive(ForeignException e) {
378    monitor.receive(e);
379  }
380
381  /**
382   * Wait for latch to count to zero, ignoring any spurious wake-ups, but waking periodically to
383   * check for errors
384   * @param latch latch to wait on
385   * @param monitor monitor to check for errors while waiting
386   * @param wakeFrequency frequency to wake up and check for errors (in
387   *          {@link TimeUnit#MILLISECONDS})
388   * @param latchDescription description of the latch, for logging
389   * @throws ForeignException type of error the monitor can throw, if the task fails
390   * @throws InterruptedException if we are interrupted while waiting on latch
391   */
392  public static void waitForLatch(CountDownLatch latch, ForeignExceptionSnare monitor,
393      long wakeFrequency, String latchDescription) throws ForeignException,
394      InterruptedException {
395    boolean released = false;
396    while (!released) {
397      if (monitor != null) {
398        monitor.rethrowException();
399      }
400      /*
401      ForeignExceptionDispatcher.LOG.debug("Waiting for '" + latchDescription + "' latch. (sleep:"
402          + wakeFrequency + " ms)"); */
403      released = latch.await(wakeFrequency, TimeUnit.MILLISECONDS);
404    }
405    // check error again in case an error raised during last wait
406    if (monitor != null) {
407      monitor.rethrowException();
408    }
409  }
410}