View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.procedure;
19  
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.List;
23  import java.util.concurrent.Callable;
24  import java.util.concurrent.CountDownLatch;
25  import java.util.concurrent.TimeUnit;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.classification.InterfaceAudience;
30  import org.apache.hadoop.classification.InterfaceStability;
31  import org.apache.hadoop.hbase.errorhandling.ForeignException;
32  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
33  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionListener;
34  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
35  import org.apache.hadoop.hbase.errorhandling.TimeoutExceptionInjector;
36  
37  import com.google.common.collect.Lists;
38  
39  /**
40   * A globally-barriered distributed procedure.  This class encapsulates state and methods for
41   * tracking and managing a distributed procedure, as well as aborting if any member encounters
42   * a problem or if a cancellation is requested.
43   * <p>
44   * All procedures first attempt to reach a barrier point with the {@link #sendGlobalBarrierStart()}
45   * method.  The procedure contacts all members and waits for all subprocedures to execute
46   * {@link Subprocedure#acquireBarrier} to acquire its local piece of the global barrier and then
47   * send acquisition info back to the coordinator.  If all acquisitions at subprocedures succeed,
48   * the coordinator then will call {@link #sendGlobalBarrierReached()}.  This notifies members to
49   * execute the {@link Subprocedure#insideBarrier()} method.  The procedure is blocked until all
50   * {@link Subprocedure#insideBarrier} executions complete at the members.  When
51   * {@link Subprocedure#insideBarrier} completes at each member, the member sends notification to
52   * the coordinator.  Once all members complete, the coordinator calls
53   * {@link #sendGlobalBarrierComplete()}.
54   * <p>
55   * If errors are encountered remotely, they are forwarded to the coordinator, and
56   * {@link Subprocedure#cleanup(Exception)} is called.
57   * <p>
58   * Each Procedure and each Subprocedure enforces a time limit on the execution time. If the time
59   * limit expires before the procedure completes the {@link TimeoutExceptionInjector} will trigger
60   * an {@link ForeignException} to abort the procedure.  This is particularly useful for situations
61   * when running a distributed {@link Subprocedure} so participants can avoid blocking for extreme
62   * amounts of time if one of the participants fails or takes a really long time (e.g. GC pause).
63   * <p>
64   * Users should generally not directly create or subclass instances of this.  They are created
65   * for them implicitly via {@link ProcedureCoordinator#startProcedure(ForeignExceptionDispatcher,
66   * String, byte[], List)}}
67   */
68  @InterfaceAudience.Public
69  @InterfaceStability.Evolving
70  public class Procedure implements Callable<Void>, ForeignExceptionListener {
71    private static final Log LOG = LogFactory.getLog(Procedure.class);
72  
73    //
74    // Arguments and naming
75    //
76  
77    // Name of the procedure
78    final private String procName;
79    // Arguments for this procedure execution
80    final private byte[] args;
81  
82    //
83    // Execution State
84    //
85    /** latch for waiting until all members have acquire in barrier state */
86    final CountDownLatch acquiredBarrierLatch;
87    /** latch for waiting until all members have executed and released their in barrier state */
88    final CountDownLatch releasedBarrierLatch;
89    /** latch for waiting until a procedure has completed */
90    final CountDownLatch completedLatch;
91    /** monitor to check for errors */
92    private final ForeignExceptionDispatcher monitor;
93  
94    //
95    // Execution Timeout Handling.
96    //
97  
98    /** frequency to check for errors (ms) */
99    protected final long wakeFrequency;
100   protected final TimeoutExceptionInjector timeoutInjector;
101 
102   //
103   // Members' and Coordinator's state
104   //
105 
106   /** lock to prevent nodes from acquiring and then releasing before we can track them */
107   private Object joinBarrierLock = new Object();
108   private final List<String> acquiringMembers;
109   private final List<String> inBarrierMembers;
110   private ProcedureCoordinator coord;
111 
112   /**
113    * Creates a procedure. (FOR TESTING)
114    *
115    * {@link Procedure} state to be run by a {@link ProcedureCoordinator}.
116    * @param coord coordinator to call back to for general errors (e.g.
117    *          {@link ProcedureCoordinator#rpcConnectionFailure(String, IOException)}).
118    * @param monitor error monitor to check for external errors
119    * @param wakeFreq frequency to check for errors while waiting
120    * @param timeout amount of time to allow the procedure to run before cancelling
121    * @param procName name of the procedure instance
122    * @param args argument data associated with the procedure instance
123    * @param expectedMembers names of the expected members
124    */
125   public Procedure(ProcedureCoordinator coord, ForeignExceptionDispatcher monitor, long wakeFreq,
126       long timeout, String procName, byte[] args, List<String> expectedMembers) {
127     this.coord = coord;
128     this.acquiringMembers = new ArrayList<String>(expectedMembers);
129     this.inBarrierMembers = new ArrayList<String>(acquiringMembers.size());
130     this.procName = procName;
131     this.args = args;
132     this.monitor = monitor;
133     this.wakeFrequency = wakeFreq;
134 
135     int count = expectedMembers.size();
136     this.acquiredBarrierLatch = new CountDownLatch(count);
137     this.releasedBarrierLatch = new CountDownLatch(count);
138     this.completedLatch = new CountDownLatch(1);
139     this.timeoutInjector = new TimeoutExceptionInjector(monitor, timeout);
140   }
141 
142   /**
143    * Create a procedure.
144    *
145    * Users should generally not directly create instances of this.  They are created them
146    * implicitly via {@link ProcedureCoordinator#createProcedure(ForeignExceptionDispatcher,
147    * String, byte[], List)}}
148    *
149    * @param coord coordinator to call back to for general errors (e.g.
150    *          {@link ProcedureCoordinator#rpcConnectionFailure(String, IOException)}).
151    * @param wakeFreq frequency to check for errors while waiting
152    * @param timeout amount of time to allow the procedure to run before cancelling
153    * @param procName name of the procedure instance
154    * @param args argument data associated with the procedure instance
155    * @param expectedMembers names of the expected members
156    */
157   public Procedure(ProcedureCoordinator coord, long wakeFreq, long timeout,
158       String procName, byte[] args, List<String> expectedMembers) {
159     this(coord, new ForeignExceptionDispatcher(), wakeFreq, timeout, procName, args,
160         expectedMembers);
161   }
162 
163   public String getName() {
164     return procName;
165   }
166 
167   /**
168    * @return String of the procedure members both trying to enter the barrier and already in barrier
169    */
170   public String getStatus() {
171     String waiting, done;
172     synchronized (joinBarrierLock) {
173       waiting = acquiringMembers.toString();
174       done = inBarrierMembers.toString();
175     }
176     return "Procedure " + procName + " { waiting=" + waiting + " done="+ done + " }";
177   }
178   
179   /**
180    * Get the ForeignExceptionDispatcher
181    * @return the Procedure's monitor.
182    */
183   public ForeignExceptionDispatcher getErrorMonitor() {
184     return monitor;
185   }
186 
187   /**
188    * This call is the main execution thread of the barriered procedure.  It sends messages and
189    * essentially blocks until all procedure members acquire or later complete but periodically
190    * checks for foreign exceptions.
191    */
192   @Override
193   @SuppressWarnings("finally")
194   final public Void call() {
195     LOG.info("Starting procedure '" + procName + "'");
196     // start the timer
197     timeoutInjector.start();
198 
199     // run the procedure
200     try {
201       // start by checking for error first
202       monitor.rethrowException();
203       LOG.debug("Procedure '" + procName + "' starting 'acquire'");
204       sendGlobalBarrierStart();
205 
206       // wait for all the members to report acquisition
207       LOG.debug("Waiting for all members to 'acquire'");
208       waitForLatch(acquiredBarrierLatch, monitor, wakeFrequency, "acquired");
209       monitor.rethrowException();
210 
211       LOG.debug("Procedure '" + procName + "' starting 'in-barrier' execution.");
212       sendGlobalBarrierReached();
213 
214       // wait for all members to report barrier release
215       waitForLatch(releasedBarrierLatch, monitor, wakeFrequency, "released");
216 
217       // make sure we didn't get an error during in barrier execution and release
218       monitor.rethrowException();
219       LOG.info("Procedure '" + procName + "' execution completed");
220     } catch (Exception e) {
221       if (e instanceof InterruptedException) {
222         Thread.currentThread().interrupt();
223       }
224       String msg = "Procedure '" + procName +"' execution failed!";
225       LOG.error(msg, e);
226       receive(new ForeignException(getName(), e));
227     } finally {
228       LOG.debug("Running finish phase.");
229       sendGlobalBarrierComplete();
230       completedLatch.countDown();
231 
232       // tell the timer we are done, if we get here successfully
233       timeoutInjector.complete();
234       return null;
235     }
236   }
237 
238   /**
239    * Sends a message to Members to create a new {@link Subprocedure} for this Procedure and execute
240    * the {@link Subprocedure#acquireBarrier} step.
241    * @throws ForeignException
242    */
243   public void sendGlobalBarrierStart() throws ForeignException {
244     // start the procedure
245     LOG.debug("Starting procedure '" + procName + "', kicking off acquire phase on members.");
246     try {
247       // send procedure barrier start to specified list of members. cloning the list to avoid
248       // concurrent modification from the controller setting the prepared nodes
249       coord.getRpcs().sendGlobalBarrierAcquire(this, args, Lists.newArrayList(this.acquiringMembers));
250     } catch (IOException e) {
251       coord.rpcConnectionFailure("Can't reach controller.", e);
252     } catch (IllegalArgumentException e) {
253       throw new ForeignException(getName(), e);
254     }
255   }
256 
257   /**
258    * Sends a message to all members that the global barrier condition has been satisfied.  This
259    * should only be executed after all members have completed its
260    * {@link Subprocedure#acquireBarrier()} call successfully.  This triggers the member
261    * {@link Subprocedure#insideBarrier} method.
262    * @throws ForeignException
263    */
264   public void sendGlobalBarrierReached() throws ForeignException {
265     try {
266       // trigger to have member run {@link Subprocedure#insideBarrier}
267       coord.getRpcs().sendGlobalBarrierReached(this, Lists.newArrayList(inBarrierMembers));
268     } catch (IOException e) {
269       coord.rpcConnectionFailure("Can't reach controller.", e);
270     }
271   }
272 
273   /**
274    * Sends a message to members that all {@link Subprocedure#insideBarrier} calls have completed.
275    * After this executes, the coordinator can assume that any state resources about this barrier
276    * procedure state has been released.
277    */
278   public void sendGlobalBarrierComplete() {
279     LOG.debug("Finished coordinator procedure - removing self from list of running procedures");
280     try {
281       coord.getRpcs().resetMembers(this);
282     } catch (IOException e) {
283       coord.rpcConnectionFailure("Failed to reset procedure:" + procName, e);
284     }
285   }
286 
287   //
288   // Call backs from other external processes.
289   //
290 
291   /**
292    * Call back triggered by an individual member upon successful local barrier acquisition
293    * @param member
294    */
295   public void barrierAcquiredByMember(String member) {
296     LOG.debug("member: '" + member + "' joining prepared barrier for procedure '" + procName
297         + "' on coordinator");
298     if (this.acquiringMembers.contains(member)) {
299       synchronized (joinBarrierLock) {
300         if (this.acquiringMembers.remove(member)) {
301           this.inBarrierMembers.add(member);
302           acquiredBarrierLatch.countDown();
303         }
304       }
305       LOG.debug("Waiting on: " + acquiredBarrierLatch + " remaining members to acquire global barrier");
306     } else {
307       LOG.warn("Member " + member + " joined barrier, but we weren't waiting on it to join." +
308           " Continuing on.");
309     }
310   }
311 
312   /**
313    * Call back triggered by a individual member upon successful local in-barrier execution and
314    * release
315    * @param member
316    */
317   public void barrierReleasedByMember(String member) {
318     boolean removed = false;
319     synchronized (joinBarrierLock) {
320       removed = this.inBarrierMembers.remove(member);
321       if (removed) {
322         releasedBarrierLatch.countDown();
323       }
324     }
325     if (removed) {
326       LOG.debug("Member: '" + member + "' released barrier for procedure'" + procName
327           + "', counting down latch.  Waiting for " + releasedBarrierLatch.getCount()
328           + " more");
329     } else {
330       LOG.warn("Member: '" + member + "' released barrier for procedure'" + procName
331           + "', but we weren't waiting on it to release!");
332     }
333   }
334 
335   /**
336    * Waits until the entire procedure has globally completed, or has been aborted.
337    * @throws ForeignException
338    * @throws InterruptedException
339    */
340   public void waitForCompleted() throws ForeignException, InterruptedException {
341     waitForLatch(completedLatch, monitor, wakeFrequency, procName + " completed");
342   }
343 
344   /**
345    * A callback that handles incoming ForeignExceptions.
346    */
347   @Override
348   public void receive(ForeignException e) {
349     monitor.receive(e);
350   }
351 
352   /**
353    * Wait for latch to count to zero, ignoring any spurious wake-ups, but waking periodically to
354    * check for errors
355    * @param latch latch to wait on
356    * @param monitor monitor to check for errors while waiting
357    * @param wakeFrequency frequency to wake up and check for errors (in
358    *          {@link TimeUnit#MILLISECONDS})
359    * @param latchDescription description of the latch, for logging
360    * @throws ForeignException type of error the monitor can throw, if the task fails
361    * @throws InterruptedException if we are interrupted while waiting on latch
362    */
363   public static void waitForLatch(CountDownLatch latch, ForeignExceptionSnare monitor,
364       long wakeFrequency, String latchDescription) throws ForeignException,
365       InterruptedException {
366     boolean released = false;
367     while (!released) {
368       if (monitor != null) {
369         monitor.rethrowException();
370       }
371       /*
372       ForeignExceptionDispatcher.LOG.debug("Waiting for '" + latchDescription + "' latch. (sleep:"
373           + wakeFrequency + " ms)"); */
374       released = latch.await(wakeFrequency, TimeUnit.MILLISECONDS);
375     }
376   }
377 }