View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.procedure;
19  
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.HashMap;
23  import java.util.List;
24  import java.util.concurrent.Callable;
25  import java.util.concurrent.CountDownLatch;
26  import java.util.concurrent.TimeUnit;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.hbase.errorhandling.ForeignException;
32  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
33  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionListener;
34  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
35  import org.apache.hadoop.hbase.errorhandling.TimeoutExceptionInjector;
36  
37  import com.google.common.collect.Lists;
38  
39  /**
40   * A globally-barriered distributed procedure.  This class encapsulates state and methods for
41   * tracking and managing a distributed procedure, as well as aborting if any member encounters
42   * a problem or if a cancellation is requested.
43   * <p>
44   * All procedures first attempt to reach a barrier point with the {@link #sendGlobalBarrierStart()}
45   * method.  The procedure contacts all members and waits for all subprocedures to execute
46   * {@link Subprocedure#acquireBarrier} to acquire its local piece of the global barrier and then
47   * send acquisition info back to the coordinator.  If all acquisitions at subprocedures succeed,
48   * the coordinator then will call {@link #sendGlobalBarrierReached()}.  This notifies members to
49   * execute the {@link Subprocedure#insideBarrier()} method.  The procedure is blocked until all
50   * {@link Subprocedure#insideBarrier} executions complete at the members.  When
51   * {@link Subprocedure#insideBarrier} completes at each member, the member sends notification to
52   * the coordinator.  Once all members complete, the coordinator calls
53   * {@link #sendGlobalBarrierComplete()}.
54   * <p>
55   * If errors are encountered remotely, they are forwarded to the coordinator, and
56   * {@link Subprocedure#cleanup(Exception)} is called.
57   * <p>
58   * Each Procedure and each Subprocedure enforces a time limit on the execution time. If the time
59   * limit expires before the procedure completes the {@link TimeoutExceptionInjector} will trigger
60   * an {@link ForeignException} to abort the procedure.  This is particularly useful for situations
61   * when running a distributed {@link Subprocedure} so participants can avoid blocking for extreme
62   * amounts of time if one of the participants fails or takes a really long time (e.g. GC pause).
63   * <p>
64   * Users should generally not directly create or subclass instances of this.  They are created
65   * for them implicitly via {@link ProcedureCoordinator#startProcedure(ForeignExceptionDispatcher,
66   * String, byte[], List)}}
67   */
68  @InterfaceAudience.Private
69  public class Procedure implements Callable<Void>, ForeignExceptionListener {
70    private static final Log LOG = LogFactory.getLog(Procedure.class);
71  
72    //
73    // Arguments and naming
74    //
75  
76    // Name of the procedure
77    final private String procName;
78    // Arguments for this procedure execution
79    final private byte[] args;
80  
81    //
82    // Execution State
83    //
84    /** latch for waiting until all members have acquire in barrier state */
85    final CountDownLatch acquiredBarrierLatch;
86    /** latch for waiting until all members have executed and released their in barrier state */
87    final CountDownLatch releasedBarrierLatch;
88    /** latch for waiting until a procedure has completed */
89    final CountDownLatch completedLatch;
90    /** monitor to check for errors */
91    private final ForeignExceptionDispatcher monitor;
92  
93    //
94    // Execution Timeout Handling.
95    //
96  
97    /** frequency to check for errors (ms) */
98    protected final long wakeFrequency;
99    protected final TimeoutExceptionInjector timeoutInjector;
100 
101   //
102   // Members' and Coordinator's state
103   //
104 
105   /** lock to prevent nodes from acquiring and then releasing before we can track them */
106   private Object joinBarrierLock = new Object();
107   private final List<String> acquiringMembers;
108   private final List<String> inBarrierMembers;
109   private final HashMap<String, byte[]> dataFromFinishedMembers;
110   private ProcedureCoordinator coord;
111 
112   /**
113    * Creates a procedure. (FOR TESTING)
114    *
115    * {@link Procedure} state to be run by a {@link ProcedureCoordinator}.
116    * @param coord coordinator to call back to for general errors (e.g.
117    *          {@link ProcedureCoordinator#rpcConnectionFailure(String, IOException)}).
118    * @param monitor error monitor to check for external errors
119    * @param wakeFreq frequency to check for errors while waiting
120    * @param timeout amount of time to allow the procedure to run before cancelling
121    * @param procName name of the procedure instance
122    * @param args argument data associated with the procedure instance
123    * @param expectedMembers names of the expected members
124    */
125   public Procedure(ProcedureCoordinator coord, ForeignExceptionDispatcher monitor, long wakeFreq,
126       long timeout, String procName, byte[] args, List<String> expectedMembers) {
127     this.coord = coord;
128     this.acquiringMembers = new ArrayList<String>(expectedMembers);
129     this.inBarrierMembers = new ArrayList<String>(acquiringMembers.size());
130     this.dataFromFinishedMembers = new HashMap<String, byte[]>();
131     this.procName = procName;
132     this.args = args;
133     this.monitor = monitor;
134     this.wakeFrequency = wakeFreq;
135 
136     int count = expectedMembers.size();
137     this.acquiredBarrierLatch = new CountDownLatch(count);
138     this.releasedBarrierLatch = new CountDownLatch(count);
139     this.completedLatch = new CountDownLatch(1);
140     this.timeoutInjector = new TimeoutExceptionInjector(monitor, timeout);
141   }
142 
143   /**
144    * Create a procedure.
145    *
146    * Users should generally not directly create instances of this.  They are created them
147    * implicitly via {@link ProcedureCoordinator#createProcedure(ForeignExceptionDispatcher,
148    * String, byte[], List)}}
149    *
150    * @param coord coordinator to call back to for general errors (e.g.
151    *          {@link ProcedureCoordinator#rpcConnectionFailure(String, IOException)}).
152    * @param wakeFreq frequency to check for errors while waiting
153    * @param timeout amount of time to allow the procedure to run before cancelling
154    * @param procName name of the procedure instance
155    * @param args argument data associated with the procedure instance
156    * @param expectedMembers names of the expected members
157    */
158   public Procedure(ProcedureCoordinator coord, long wakeFreq, long timeout,
159       String procName, byte[] args, List<String> expectedMembers) {
160     this(coord, new ForeignExceptionDispatcher(), wakeFreq, timeout, procName, args,
161         expectedMembers);
162   }
163 
164   public String getName() {
165     return procName;
166   }
167 
168   /**
169    * @return String of the procedure members both trying to enter the barrier and already in barrier
170    */
171   public String getStatus() {
172     String waiting, done;
173     synchronized (joinBarrierLock) {
174       waiting = acquiringMembers.toString();
175       done = inBarrierMembers.toString();
176     }
177     return "Procedure " + procName + " { waiting=" + waiting + " done="+ done + " }";
178   }
179 
180   /**
181    * Get the ForeignExceptionDispatcher
182    * @return the Procedure's monitor.
183    */
184   public ForeignExceptionDispatcher getErrorMonitor() {
185     return monitor;
186   }
187 
188   /**
189    * This call is the main execution thread of the barriered procedure.  It sends messages and
190    * essentially blocks until all procedure members acquire or later complete but periodically
191    * checks for foreign exceptions.
192    */
193   @Override
194   @SuppressWarnings("finally")
195   final public Void call() {
196     LOG.info("Starting procedure '" + procName + "'");
197     // start the timer
198     timeoutInjector.start();
199 
200     // run the procedure
201     try {
202       // start by checking for error first
203       monitor.rethrowException();
204       LOG.debug("Procedure '" + procName + "' starting 'acquire'");
205       sendGlobalBarrierStart();
206 
207       // wait for all the members to report acquisition
208       LOG.debug("Waiting for all members to 'acquire'");
209       waitForLatch(acquiredBarrierLatch, monitor, wakeFrequency, "acquired");
210       monitor.rethrowException();
211 
212       LOG.debug("Procedure '" + procName + "' starting 'in-barrier' execution.");
213       sendGlobalBarrierReached();
214 
215       // wait for all members to report barrier release
216       LOG.debug("Waiting for all members to 'release'");
217       waitForLatch(releasedBarrierLatch, monitor, wakeFrequency, "released");
218 
219       // make sure we didn't get an error during in barrier execution and release
220       monitor.rethrowException();
221       LOG.info("Procedure '" + procName + "' execution completed");
222     } catch (Exception e) {
223       if (e instanceof InterruptedException) {
224         Thread.currentThread().interrupt();
225       }
226       String msg = "Procedure '" + procName +"' execution failed!";
227       LOG.error(msg, e);
228       receive(new ForeignException(getName(), e));
229     } finally {
230       LOG.debug("Running finish phase.");
231       sendGlobalBarrierComplete();
232       completedLatch.countDown();
233 
234       // tell the timer we are done, if we get here successfully
235       timeoutInjector.complete();
236       return null;
237     }
238   }
239 
240   /**
241    * Sends a message to Members to create a new {@link Subprocedure} for this Procedure and execute
242    * the {@link Subprocedure#acquireBarrier} step.
243    * @throws ForeignException
244    */
245   public void sendGlobalBarrierStart() throws ForeignException {
246     // start the procedure
247     LOG.debug("Starting procedure '" + procName + "', kicking off acquire phase on members.");
248     try {
249       // send procedure barrier start to specified list of members. cloning the list to avoid
250       // concurrent modification from the controller setting the prepared nodes
251       coord.getRpcs().sendGlobalBarrierAcquire(this, args, Lists.newArrayList(this.acquiringMembers));
252     } catch (IOException e) {
253       coord.rpcConnectionFailure("Can't reach controller.", e);
254     } catch (IllegalArgumentException e) {
255       throw new ForeignException(getName(), e);
256     }
257   }
258 
259   /**
260    * Sends a message to all members that the global barrier condition has been satisfied.  This
261    * should only be executed after all members have completed its
262    * {@link Subprocedure#acquireBarrier()} call successfully.  This triggers the member
263    * {@link Subprocedure#insideBarrier} method.
264    * @throws ForeignException
265    */
266   public void sendGlobalBarrierReached() throws ForeignException {
267     try {
268       // trigger to have member run {@link Subprocedure#insideBarrier}
269       coord.getRpcs().sendGlobalBarrierReached(this, Lists.newArrayList(inBarrierMembers));
270     } catch (IOException e) {
271       coord.rpcConnectionFailure("Can't reach controller.", e);
272     }
273   }
274 
275   /**
276    * Sends a message to members that all {@link Subprocedure#insideBarrier} calls have completed.
277    * After this executes, the coordinator can assume that any state resources about this barrier
278    * procedure state has been released.
279    */
280   public void sendGlobalBarrierComplete() {
281     LOG.debug("Finished coordinator procedure - removing self from list of running procedures");
282     try {
283       coord.getRpcs().resetMembers(this);
284     } catch (IOException e) {
285       coord.rpcConnectionFailure("Failed to reset procedure:" + procName, e);
286     }
287   }
288 
289   //
290   // Call backs from other external processes.
291   //
292 
293   /**
294    * Call back triggered by an individual member upon successful local barrier acquisition
295    * @param member
296    */
297   public void barrierAcquiredByMember(String member) {
298     LOG.debug("member: '" + member + "' joining acquired barrier for procedure '" + procName
299         + "' on coordinator");
300     if (this.acquiringMembers.contains(member)) {
301       synchronized (joinBarrierLock) {
302         if (this.acquiringMembers.remove(member)) {
303           this.inBarrierMembers.add(member);
304           acquiredBarrierLatch.countDown();
305         }
306       }
307       LOG.debug("Waiting on: " + acquiredBarrierLatch + " remaining members to acquire global barrier");
308     } else {
309       LOG.warn("Member " + member + " joined barrier, but we weren't waiting on it to join." +
310           " Continuing on.");
311     }
312   }
313 
314   /**
315    * Call back triggered by a individual member upon successful local in-barrier execution and
316    * release
317    * @param member
318    * @param dataFromMember
319    */
320   public void barrierReleasedByMember(String member, byte[] dataFromMember) {
321     boolean removed = false;
322     synchronized (joinBarrierLock) {
323       removed = this.inBarrierMembers.remove(member);
324       if (removed) {
325         releasedBarrierLatch.countDown();
326       }
327     }
328     if (removed) {
329       LOG.debug("Member: '" + member + "' released barrier for procedure'" + procName
330           + "', counting down latch.  Waiting for " + releasedBarrierLatch.getCount()
331           + " more");
332     } else {
333       LOG.warn("Member: '" + member + "' released barrier for procedure'" + procName
334           + "', but we weren't waiting on it to release!");
335     }
336     dataFromFinishedMembers.put(member, dataFromMember);
337   }
338 
339   /**
340    * Waits until the entire procedure has globally completed, or has been aborted.  If an
341    * exception is thrown the procedure may or not have run cleanup to trigger the completion latch
342    * yet.
343    * @throws ForeignException
344    * @throws InterruptedException
345    */
346   public void waitForCompleted() throws ForeignException, InterruptedException {
347     waitForLatch(completedLatch, monitor, wakeFrequency, procName + " completed");
348   }
349 
350   /**
351    * Waits until the entire procedure has globally completed, or has been aborted.  If an
352    * exception is thrown the procedure may or not have run cleanup to trigger the completion latch
353    * yet.
354    * @return data returned from procedure members upon successfully completing subprocedure.
355    * @throws ForeignException
356    * @throws InterruptedException
357    */
358   public HashMap<String, byte[]> waitForCompletedWithRet() throws ForeignException, InterruptedException {
359     waitForCompleted();
360     return dataFromFinishedMembers;
361   }
362 
363   /**
364    * Check if the entire procedure has globally completed, or has been aborted.
365    * @throws ForeignException
366    */
367   public boolean isCompleted() throws ForeignException {
368     // Rethrow exception if any
369     monitor.rethrowException();
370     return (completedLatch.getCount() == 0);
371   }
372 
373   /**
374    * A callback that handles incoming ForeignExceptions.
375    */
376   @Override
377   public void receive(ForeignException e) {
378     monitor.receive(e);
379   }
380 
381   /**
382    * Wait for latch to count to zero, ignoring any spurious wake-ups, but waking periodically to
383    * check for errors
384    * @param latch latch to wait on
385    * @param monitor monitor to check for errors while waiting
386    * @param wakeFrequency frequency to wake up and check for errors (in
387    *          {@link TimeUnit#MILLISECONDS})
388    * @param latchDescription description of the latch, for logging
389    * @throws ForeignException type of error the monitor can throw, if the task fails
390    * @throws InterruptedException if we are interrupted while waiting on latch
391    */
392   public static void waitForLatch(CountDownLatch latch, ForeignExceptionSnare monitor,
393       long wakeFrequency, String latchDescription) throws ForeignException,
394       InterruptedException {
395     boolean released = false;
396     while (!released) {
397       if (monitor != null) {
398         monitor.rethrowException();
399       }
400       /*
401       ForeignExceptionDispatcher.LOG.debug("Waiting for '" + latchDescription + "' latch. (sleep:"
402           + wakeFrequency + " ms)"); */
403       released = latch.await(wakeFrequency, TimeUnit.MILLISECONDS);
404     }
405     // check error again in case an error raised during last wait
406     if (monitor != null) {
407       monitor.rethrowException();
408     }
409   }
410 }